下面将介绍如何通过 Java API 接口方式进行 HDFS 的相关操作。
在使用 HDFS 提供的 API 之前,需要先进行 HDFS 初始化操作。初始化 HDFS 时会加载 HDFS 的配置文件,HDFS 使用到的配置文件主要为 core-site.xml 和 hdfs-site.xml 两个文件,可以从 configmap hdfs-config
中得到。
初始化代码样例如下。
private void init() throws IOException {
conf = new Configuration();
// conf path for core-site.xml and hdfs-site.xml
conf.addResource(new Path(PATH_TO_HDFS_SITE_XML));
conf.addResource(new Path(PATH_TO_CORE_SITE_XML));
fSystem = FileSystem.get(conf);
}
在 HDFS 文件系统初始化之后,就可以调用 HDFS 提供的各种 API 进行开发。
如果要在 HDFS 文件系统中创建目录,需要 FileSystem 实例的 exists 方法判断该目录是否已经存在:
创建目录代码样例如下。
/**
* create directory path
*
* @param dirPath
* @return
* @throws java.io.IOException
*/
private boolean createPath(final Path dirPath) throws IOException {
if (!fSystem.exists(dirPath)) {
fSystem.mkdirs(dirPath);
}
return true;
}
通过调用 FileSystem 实例的 create 方法获取写文件的输出流。通常获得输出流之后,可以直接对这个输出流进行写入操作,将内容写入 HDFS 的指定文件中。写完文件后,需要调用 close 方法关闭输出流。
写文件代码样例如下。
/**
* 创建文件,写文件
*
* @throws java.io.IOException
*/
private void createAndWrite() throws IOException {
final String content = "Hello HDFS!";
FSDataOutputStream out = null;
try {
out = fSystem.create(new Path(DEST_PATH + File.separator + FILE_NAME));
out.write(content.getBytes());
out.hsync();
LOG.info("success to write.");
} finally {
// make sure the stream is closed finally.
out.close();
}
}
对于已经在 HDFS 中存在的文件,可以追加指定的内容,以增量的形式在该文件现有内容的后面追加。通过调用 FileSystem 实例的 append 方法获取追加写入的输出流。然后使用该输出流将待追加内容添加到 HDFS 的指定文件后面。追加完指定的内容后,需要调用 close 方法关闭输出流。
注意:需要确保待追加的文件已经存在,并且没有正在被写入内容,否则追加内容会失败抛出异常。
追加文件内容代码样例如下。
/**
* 追加文件内容
*
* @throws java.io.IOException
*/
private void appendContents() throws IOException {
final String content = "Hello Hello";
FSDataOutputStream out = null;
try {
out = fSystem.append(new Path(DEST_PATH + File.separator + FILE_NAME));
out.write(content.getBytes());
out.hsync();
LOG.info("success to append.");
} finally {
// make sure the stream is closed finally.
out.close();
}
}
读文件即为获取 HDFS 上某个指定文件的内容。通过调用 FileSystem 实例的 open 方法获取读取文件的输入流。然后使用该输入流读取 HDFS 的指定文件的内容。读完文件后,需要调用 close 方法关闭输入流。
读文件代码样例如下。
private void read() throws IOException {
String strPath = DEST_PATH + File.separator + FILE_NAME;
Path path = new Path(strPath);
FSDataInputStream in = null;
BufferedReader reader = null;
StringBuffer strBuffer = new StringBuffer();
try {
in = fSystem.open(path);
reader = new BufferedReader(new InputStreamReader(in));
String sTempOneLine;
// write file
while ((sTempOneLine = reader.readLine()) != null) {
strBuffer.append(sTempOneLine);
}
LOG.info("result is : " + strBuffer.toString());
LOG.info("success to read.");
} finally {
// make sure the streams are closed finally.
IOUtils.closeStream(reader);
IOUtils.closeStream(in);
}
}
通过调用 delete 方法删除 HDFS 上某个指定目录。delete 方法第二个参数代表是否递归删除目录下面的所有目录。如果该参数为 false,而目录下还存在文件或者子目录,则删除目录操作会失败。
注意:待删除的目录会被直接删除,且无法恢复,因此,请谨慎使用删除目录操作。
删除目录代码样例如下。
private boolean deletePath(final Path dirPath) throws IOException {
if (!fSystem.exists(dirPath)) {
return false;
}
// fSystem.delete(dirPath, true);
return fSystem.delete(dirPath, true);
}
通过调用 delete 方法删除 HDFS 上某个指定文件。
注意:待删除的文件会被直接删除,且无法恢复,因此,请谨慎使用删除文件操作。
删除文件代码样例如下。
private void deleteFile() throws IOException {
Path beDeletedPath = new Path(DEST_PATH + File.separator + FILE_NAME);
if (fSystem.delete(beDeletedPath, true)) {
LOG.info("success to delete the file " + DEST_PATH + File.separator + FILE_NAME);
} else {
LOG.warn("failed to delete the file " + DEST_PATH + File.separator + FILE_NAME);
}
}
对于 HDFS 来说,文件的重命名和移动是一个操作。调用 FileSystem 的 rename 方法对 HDFS 文件系统的文件进行重命名操作。
注意:rename 方法的原文件必须存在,并且目标文件不能存在,否则重命名操作会失败。
移动或重命名文件代码样例如下。
private void renameFile() throws IOException {
Path srcFilePath = new Path(SRC_PATH + File.separator + SRC_FILE_NAME);
Path destFilePath = new Path(DEST_PATH + File.separator + DEST_FILE_NAME);
fs.rename(new Path(srcFilePath), new Path(destFilePath));
}
对于 HDFS 来说,目录的重命名和移动是一个操作。调用 FileSystem 的 rename 方法对 HDFS 文件系统的目录进行重命名操作。
注意:rename 方法的原目录必须存在,并且目标目录不能存在,否则重命名操作会失败。
移动或重命名目录代码样例如下。
private void renameDir() throws IOException {
Path srcDirPath = new Path(SRC_PATH + File.separator + SRC_DIR_NAME);
Path destDirPath = new Path(DEST_PATH + File.separator + DEST_DIR_NAME);
fs.rename(new Path(srcDirPath), new Path(destDirPath));
}