HDFS java API 二次开发

文章目录

  • HDFS读写流程
    • API java实现

HDFS读写流程

在这里插入图片描述
1.客户端通过调用 DistributedFileSystem 的create方法,创建一个新的文件。

2.DistributedFileSystem 通过 RPC(远程过程调用)调用 NameNode,去创建一个没有blocks关联的新文件。创建前,NameNode 会做各种校验,比如文件是否存在, 客户端有无权限去创建等。如果校验通过,NameNode 就会记录下新文件,否则就会抛出IO异常。

3.前两步结束后会返回 FSDataOutputStream 的对象,和读文件的时候相似,FSDataOutputStream 被封装成 DFSOutputStream,DFSOutputStream 可以协调NameNode和 DataNode。客户端开始写数据到DFSOutputStream,DFSOutputStream会把数据切成一个个小packet,然后排成队列data queue。

4.DataStreamer 会去处理接受 data queue,它先问询 NameNode 这个新的 block 最适合存储的在哪几个DataNode里,比如重复数是3,那么就找到3个最适合的DataNode,把它们排成一个 pipeline。DataStreamer 把 packet 按队列输出到管道的第一个 DataNode 中,第一个 DataNode又把 packet 输出到第二个 DataNode 中,以此类推。

5.DFSOutputStream 还有一个队列叫 ack queue,也是由 packet 组成,等待DataNode的收到响应,当pipeline中的所有DataNode都表示已经收到的时候,这时akc queue才会把对应的packet包移除掉。6.客户端完成写数据后,调用close方法关闭写入流。

7.DataStreamer 把剩余的包都刷到 pipeline 里,然后等待 ack 信息,收到最后一个ack 后,通知 DataNode 把文件标示为已完成。

在这里插入图片描述

Client首先调用FileSystem对象的open方法,其实获取的是一个DistributedFileSystem的 实例。

DistributedFileSystem通过RPC向NameNode获得文件的第一批block的locations,返回多个locations,这些locations按照hadoop拓扑结构排序,距离客户端近的排在前面。

前两步会返回一个FSDataInputStream对象,该对象会被封装成 DFSInputStream 对象,DFSInputStream可以方便的管理datanode和namenode数据流。客户端调用read方 法,DFSInputStream就会找出离客户端最近的datanode并连接datanode。

数据从datanode源源不断的流向客户端。

如果第一个block块的数据读完了,就会关闭指向第一个block块的datanode连接, 接着读取下一个block块。这些操作对客户端来说是透明的,从客户端的角度来看只是 读一个持续不断的流。

如果第一批block都读完了,DFSInputStream就会去namenode拿下一批blocks的location,然后继续读,如果所有的block块都读完,这时就会关闭掉所有的流。

API java实现

1package hdfs; 2 3import org.apache.commons.io.IOUtils; 4import org.apache.hadoop.conf.Configuration; 5import org.apache.hadoop.fs.FileSystem; 6import org.apache.hadoop.fs.*; 7 8import java.io.*; 9 10public class aaa { 11 FileSystem fs = null; 12 Configuration conf =null; 13 public void init()throws Exception { 14 //Hadoop类库中提供的HDFS操作的核心API是FileSystem抽象类,该类提供了一系列方法来进行相关的操作 15 //FileSystem (不能生成对象)是一个通用的文件系统API,是一个抽象类,该类提供了一系列方法来进行相关的操作 16 //configuration对象是对文件系统中属性信息的封装,默认的属性来自hadoop配置文件core-site.xml中的配置 17 //进行文件操作的基本流程:1.创建configuration对象 2.利用fileSystem的静态get方法获取FileSystem实例 3.调用FileSystem的方法进行实际的文件操作 18 conf=new Configuration(); 19 //conf.set("fs.defaultFS","hdfs://node01:8020");//设置本程序所访问的HDFS服务的地址,如果不通过代码进行设置,会默认取Hadoop安装环境下core-site.xml文件中配置的fs.defaultFS的属性值 20 fs=FileSystem.get(conf);//由于不能new,所以通过get静态方法来获取实例 21 22 23 } 24 //上传文件 25 public void upload() throws Exception{ 26 fs.copyFromLocalFile(new Path("D:/text02.txt"), new Path("/")); 27// 关闭 28 fs.close(); 29 30 } 31 //使用流的方式上传文件 32 public void ioupload()throws IllegalArgumentException,IOException{ 33 //true表示是否覆盖源文件 34 FSDataOutputStream out = fs.create(new Path("/text01.txt"),true); 35 FileInputStream in = new FileInputStream("D:/text02.txt"); 36 //org.apabhe,commons.io下的IOUtils 37 IOUtils.copy(in,out); 38 39 } 40 41 //下载文件 42 public void download()throws Exception{ 43 fs.copyToLocalFile(new Path("/text01.txt"),new Path("D:/text02.txt")); 44 fs.close(); 45 46 } 47 //以流的方式下载文件 48 public void download2()throws Exception{ 49 FSDataInputStream in = fs.open(new Path("/text01.txt")); 50 OutputStream out = new FileOutputStream("D:/text02.txt"); 51 IOUtils.copy(in,out); 52 53 54 } 55 //创建目录 56 public void mkdir()throws IllegalArgumentException,IOException{ 57 //可递归创建目录,返回值表示是否创建成功 58 boolean b = fs.mkdirs(new Path("/wj")); 59 System.out.println(b); 60 } 61 62 //删除目录或文件 63 public void delete ()throws IllegalArgumentException,IOException{ 64 //true 表示递归删除,返回值代表是否删除成功 65 boolean b = fs.delete(new Path("/wj")); 66 System.out.println(b); 67 } 68 //打印指定路径下的文件信息(不含目录,可递归) 69 public void listFile()throws FileNotFoundException,IllegalArgumentException,IOException{ 70 //true表示是否递归,返回的是迭代器对象 71 RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"),true); 72 //fs.listFiles方法,返回LocatedFileStatus的迭代器,自带递归。但是它是继承于FileStatus的,而且构建函数是FileStatus的文件版,即LocaledFileStatus只能列出文件 73 while(listFiles.hasNext()){ 74 LocatedFileStatus file = listFiles.next(); 75 System.out.println("owner:"+file.getOwner()); 76 System.out.println("filename:"+file.getPath().getName()); 77 System.out.println("blocksize:"+file.getBlockSize()); 78 System.out.println("replication"+file.getReplication()); 79 System.out.println("permission"+file.getPermission()); 80 BlockLocation[] blockLocations = file.getBlockLocations(); 81 for (BlockLocation b : blockLocations){ 82 System.out.println("块的起始偏移量"+b.getOffset()); 83 System.out.println("块的长度"+b.getLength()); 84 String[] hosts = b.getHosts(); 85 for (String host : hosts){ 86 System.out.println("块的所在服务器:"+host); 87 88 } 89 } 90 System.out.println("==================================="); 91 } 92 } 93 //指定目录下的文件或目录信息(不可递归) 94 public void list()throws FileNotFoundException,IllegalArgumentException,IOException{ 95 //返回的是数组,不能递归目录中的内容 96 //FileStatus对象封装了文件系统中文件和目录的元数据等信息 97 FileStatus[] listStatus = fs.listStatus(new Path("/"));//列出文件系统 98 for (FileStatus fs : listStatus){ 99 System.out.println((fs.isFile()?"file:":"directory:")+fs.getPath().getName()); 100 } 101 } 102 public static void main(String[] args)throws Exception{ 103// 选择本地运行,需要在本地安装hadoop环境 104 System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.5\\"); 105 aaa HDFS =new aaa(); 106 HDFS.init(); 107 HDFS.upload(); 108 //HDFS.ioupload(); 109 //HDFS.mkdir(); 110 //HDFS.delete(); 111 //HDFS.listFile(); 112 //HDFS.list(); 113 114 } 115} 116 117 118

代码交流 2021