HDFS API

HDFS API编程

FileSystem:编程的入口点

一、添加依赖和导入package

1
2
3
4
5
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.2</version>
</dependency>

导入需要的package

1
2
3
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

类中具体方法可参考:

https://hadoop.apache.org/docs/r3.2.2/api/org/apache/hadoop/fs/FileSystem.html

https://hadoop.apache.org/docs/r3.2.2/api/org/apache/hadoop/conf/Configurable.html

https://hadoop.apache.org/docs/r3.2.2/api/org/apache/hadoop/fs/Path.html

二、获取hdfs的FileSystem对象

Hadoop中关于文件操作类基本上全部是在”org.apache.hadoop.fs“包中,这些API能够支持的操作包含:打开文件,读写文件,删除文件等。

Hadoop类库中最终面向用户提供的接口类FileSystem,该类是个抽象类,只能通过来类的get方法得到具体类。get方法存在几个重载版本,常用的是这个:

1
2
3
public static FileSystem get(Configuration conf) throws IOException {
return get(getDefaultUri(conf), conf);
}
  1. 获取Configuration对象

    我们需要先new一个Configuration对象

    1
    Configuration config = new Configuration();//获取的是hadoop默认配置文件

    (生产上一般不需要额外设置)如果需要设置,则调用Configuration对象的set方法,如下:

    1
    2
    3
    4
    5
    config.set("fs.defaultFS", "hdfs://hostname:9000");
    config.set("dfs.client.use.datanode.hostname", "true");//还要到hdfs-site.xml里添加dfs.datanode.use.datanode.hostname:true
    config.set("dfs.replication", "1");//不设置的话,默认副本数是3
    //系统更改hadoop用户名称
    //System.setProperty("HADOOP_USER_NAME", "hadoop");
  2. 获取FileSystem对象

    把Configuration对象conf传给FileSystem类的get()方法获得FileSystem类对象hdfs

    1
    FileSystem hdfs = FileSystem.get(config);
  3. 进行文件操作

    操作过程中有关路径的需要使用org.apache.hadoop.fs.Path类,常用的是:

    1
    Path src = new Path("pathString");
  4. 释放资源

    1
    2
    3
    if(null != hdfs) {
    hdfs.close();
    }

三、利用api进行操作

获得fs对象hdfs

1
2
Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(config);

mkdir:创建目录

1
2
Path path = new Path("/pathString");
hdfs.mkdir(path)

copyFromLocalFile:从本地复制文件到hdfs

1
2
3
Path src = new Path("srcFile");
Path dst = new Path("dstFile");
hdfs.copyFromLocalFile(src, dst);

copyToLocalFile:从hdfs复制文件到本地

1
2
3
4
Path src = new Path("srcFile");
Path dst = new Path("dstFile");
hdfs.copyToLocalFile(src, dst);
//hdfs.copyToLocalFile(true, src, dst);//true:delSrc;一般不用

rename:移动文件

1
2
3
Path src = new Path("path1");
Path dst = new Path("path2");
fileSystem.rename(src, dst);

Deprecated.

Renames Path src to Path dst

  • Fails if src is a file and dst is a directory.
  • Fails if src is a directory and dst is a file.
  • Fails if the parent of dst does not exist or is a file.

If OVERWRITE option is not passed as an argument, rename fails if the dst already exists.

If OVERWRITE option is passed as an argument, rename overwrites the dst if it is a file or an empty directory. Rename fails if dst is a non-empty directory.

Note that atomicity of rename is dependent on the file system implementation. Please refer to the file system documentation for details. This default implementation is non atomic.

This method is deprecated since it is a temporary method added to support the transition from FileSystem to FileContext for user applications.

  • Parameters:

src - path to be renamed

dst - new path after rename

  • Throws:

FileNotFoundException - src path does not exist, or the parent path of dst does not exist.

FileAlreadyExistsException - dest path exists and is a file

ParentNotDirectoryException - if the parent path of dest is not a directory

IOException - on failure

listFiles:文件列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(new Path("/user/hadoop"), true);

while(files.hasNext()) {
LocatedFileStatus fileStatus = files.next();
String isDir = fileStatus.isDirectory() ? "d" : "-";
String permission = fileStatus.getPermission().toString();
short replication = fileStatus.getReplication();
long len = fileStatus.getLen();
String path = fileStatus.getPath().toString();

System.out.println(isDir + permission + "\t" + replication + "\t" + len + "\t" + path);

BlockLocation[] blockLocations = fileStatus.getBlockLocations();
//for(BlockLocation blockLocation : blockLocations) {
// String[] hosts = blockLocation.getHosts();
// for(String host: hosts) {
// System.out.println(host);
// }
//}
int blockLen = blockLocations.length;
for(int i=0;i<blockLen;i++){
String[] hosts = blockLocations[i].getHosts();
System.out.println("block_"+i+"_location:"+hosts[0]);
}

}

delete:删除文件

1
2
3
Path path = new Path("deleteFilePath");
fileSystem.delete(path,false);
//fileSystem.delete(new Path("deleteFilePath"),true);//true:递归删除

exists:查看文件是否存在

1
2
Path findFile = new Path("filePath");
boolean isExists = hdfs.exists(findFile);

FileStatus:查看HDFS文件的最后修改时间

1
2
3
Path path = new Path("fileName");
FileStatus fileStatus = hdfs.getFileStatus(path);
long modificationTime = fileStatus.getModificationTime

其他:用读写IO拷贝文件

1
2
3
4
5
6
7
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
  • 从本地文件拷贝到服务器上去 put
    读本地文件(读io) 写到服务器上去(写io)
1
2
3
4
5
6
7
BufferedInputStream in = new BufferedInputStream(new FileInputStream(new File("data/wc.data")));
FSDataOutputStream out = fileSystem.create(new Path("/input/wc-io.txt"));

IOUtils.copyBytes(in, out, 4096);

IOUtils.closeStream(out);
IOUtils.closeStream(in);
  • 下载服务器的文件到本地
    读服务器的数据(读io) 写入到本地(写io)

    1
    2
    3
    4
    5
    6
    7
    FSDataInputStream in = fileSystem.open(new Path("/input/wc-io.txt"));
    FileOutputStream out = new FileOutputStream(new File("output/b.txt"));

    IOUtils.copyBytes(in, out, 4096);

    IOUtils.closeStream(out);
    IOUtils.closeStream(in);

更多的hdfs api接口方法可参考:https://hadoop.apache.org/docs/r3.2.2/api/org/apache/hadoop/fs/FileSystem.html