HDFS补充点知识
一、HDFS分块抽象的好处
1 文件大小可以大于任意一个磁盘的容量,块并不需要存储在同一个磁盘上
2 抽象块作为存储单元,简化存储子系统的设计
- datanode将块作为处理对象,能存储多少块也能计算出
- namenode管理元数据
3 数据备份提高容错能力和可用性
HDFS块为什么这么大?
最小化寻址开销
块足够大时,磁盘传输速率会远远大于寻址时间,则传输时间更依赖于磁盘传输速率
二、 HDFS处理大量小文件
大量小文件的缺点:
- 占用namenode内存
- 处理时增加map任务数量,增加寻址次数
1 使用HAR(Hadoop Archives),构建在其他文件系统之上用于文件存档的文件系统
Hadoop存档文件系统通常将HDFS中的多个文件打包成一个存档文件,减少namenode内存的使用
hadoop archive命令创建HAR文件
2 使用sequencefile
3 运行时使用CombinerFileInputFormat
点此查看对小文件的处理
三、HDFS的容错机制
1 远程备份组成元数据持久状态的文件
将持久状态写入本地磁盘的同时,写入一个远程的网络文件系统NFS,操作是实时且原子性的
2 运行secondary namenode(辅助namenode)
辅助namenode主要是合并edits和fsimage,并保存合并后的fsimage
namenode出现故障时,启用辅助namenode,因为辅助namenode上的fsimage未包含最近的edits文件,所以一般是将远程NFS上的元数据信息拷贝到辅助namenode上作为主namenode运行
具体见HDFS学习总结的checkpoint的工作机制
四、文件权限
hadoop fs -ls /countout
启用权限控制
dfs.permission.enabled
修改文件权限
hadoop fs -chmod 777 /wordcount
文件模式
drwx: 目录 可读 可写 可执行
-r-x: 文件 可读 不可写 可执行
五、高可用相关
故障切换failover controller
- 管理将active namenode转换为standby namenode的过程
- 默认使用zookeeper保证只有一个active namenode
- 每个namenode上运行一个failover controller,监控宿主namenode状态(使用心跳机制)并在active namenode失效时进行故障切换
- 使用配置文件实现故障切换控制
规避fencing
- 为保证之前的active namenode确实停止运行,高可用使用“规避”
- QJM:仅允许一个namenode向编辑日志中写入数据,设置一个SSH规避命令用于杀死namenode进程
NFS过滤器:不能实现只有一个namenode写入数据
撤销访问权限
屏蔽网络端口
断电
高可用实现了什么
一个新的namenode满足以下条件才可以开始为集群提供服务,导致冷启动时间需要30min,甚至更长
- 将命名空间映像加载到内存中
- 重演编辑日志
- 接收到足够多datanode的数据块报告并退出安全模式
高可用HDFS架构实现:
- 使用高可用的共享存储实现编辑日志的共享
- datanode需要同时向两个namenode发送数据块处理报告,因为文件与数据块的映射关系存储在namenode内存中,而不是磁盘上
- 使用故障切换和规避机制处理namenode的失效问题,这对用户是透明的
- 备用namenode包含了辅助namenode的功能,并为活动namenode的命名空间设置周期性检查点
结果:
因为备用namenode内存中也持有最新的状态:包含最新的编辑日志条目和最新的数据块映射信息,所以只需要几十秒到一分钟就可以实现任务接管
六、java API实现流式数据访问
package HDFS;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.Test;
import java.io.IOException;
import java.net.URISyntaxException;
/**
* Unit test for simple App.
*/
public class AppTest {
@Test
/**
* HDFS获取文件系统
*/
public void initHDFS() throws Exception {
// 1 创建配置信息对象
Configuration configuration = new Configuration();
// 2 获取文件系统
FileSystem fs = FileSystem.get(configuration);
// 3 打印文件系统
System.out.println(fs.toString());
}
@Test
/**
* HDFS文件上传(测试参数优先级)
*/
public void testCopyFromLocalFIle() throws URISyntaxException, IOException, InterruptedException {
//1 获取文件系统
Configuration configuration = new Configuration();
//configuration.set("dfs.replication","2");
FileSystem fs = FileSystem.get(configuration);
//2 上传文件
fs.copyFromLocalFile(new Path("D:/222.txt"),new Path("/idea.txt"));
//3 关闭资源
fs.close();
System.out.println("over");
}
/**
* HDFS文件的下载
*/
@Test
public void testCopyToLocalFile() throws URISyntaxException, IOException, InterruptedException {
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(configuration);
// 2 执行下载操作
//boolean delSrc 指是否将原文件删除
//Path src 指要下载的文件路径
//Path dst 指将文件下载到的路径
//boolean useRawLocalFileSystem 是否开启文件校验
fs.copyToLocalFile(false,new Path("/wordcount.txt"),new Path("D:/wordcount.txt"),true);
//3 关闭资源
fs.close();
System.out.println("over");
}
@Test
/**
* HDFS文件目录的创建
*/
public void testMkdir() throws URISyntaxException, IOException, InterruptedException {
//1.获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(configuration);
//2.创建目录
fs.mkdirs(new Path("/Test/Mkdir/is/success"));
//3.关闭资源
fs.close();
}
/**
* HDFS文件的删除
* @throws IOException
*/
@Test
public void testDelete() throws IOException, URISyntaxException, InterruptedException {
//1.获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(configuration);
//2.执行删除
fs.delete(new Path("/Test"),true);// 后面的布尔选项是递归删除
//3.关闭资源
fs.close();
}
/**
* HDFS 文件名的修改
* @throws URISyntaxException
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testRename() throws URISyntaxException, IOException, InterruptedException {
//1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(configuration);
//2 修改文件名称
fs.rename(new Path("/test"),new Path("/test.txt"));
//3. 关闭资源
fs.close();
}
@Test
public void testListFiles() throws URISyntaxException, IOException, InterruptedException {
//1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(configuration);
long s = fs.getStatus().getCapacity();
System.out.println(s);
//2 获取文件详情
/**
* 思考:为什么返回迭代器不是list之类的容器
* 集合占内存多 一次全部拿取
* 迭代器一次拿一个
*/
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"),true);
while (listFiles.hasNext()){
LocatedFileStatus status = listFiles.next();
//输出详情
//文件名称
System.out.println("文件名:"+status.getPath().getName());
//长度
System.out.println("文件大小"+status.getLen());
//获取组
System.out.println("所属组:"+status.getGroup());
//权限
System.out.println("权限:"+status.getPermission());
//获取存储的块信息
BlockLocation[] blockLocations = status.getBlockLocations();
for (BlockLocation blockLocation:blockLocations){
//获取块的存储的主机节点
String[] hosts = blockLocation.getHosts();
// String[] ip = blockLocation.getNames();
for (String host:hosts){
System.out.println("获取块的存储的主机节点:"+host);
}
//
}
System.out.println("-------------------------");
}
fs.close();
}
}
七、副本放在哪里??
副本放哪里?
考量因素:可靠性,写入带宽,读取带宽
权衡冗余性和带宽损耗
多个副本:第一个副本放在运行客户端的节点上,第二个副本放在不同的机架的节点上,第三个副本与第一个副本放在相同机架上,但在不同节点上,其他副本随机放在集群中的节点
八、客户端和与之交互的HDFS,namenode,datanode之间的数据流
文件读取
客户端调用FileSystem的open()方法打开文件,对于HDFS,是一个DistributedFileSystem的实例
- DistributedFileSystem的实例通过RPC调用namenode,namenode根据映射关系返回存有文件块副本的datanode地址,datanode根据与客户端的距离排序(网络拓扑),如果本身也是datanode且存有文件块,则客户端从本地获取副本
- DistributedFileSystem返回一个FSDataInputStream对象给客户端以便读取数据,FSDataInputStream封装了DFSInputStream,这个类管理者datanode和namenode的I/O,客户端对DFSInputStream反复调用read()方法,将数据从datanode传输到客户端
- 块读取完毕后,DFSInputStream关闭与该datanode的连接,寻找下一个块所在的最佳datanode,重复读取,一直到读取完毕,调用FSDataInputStream的close()
DFSInputStream的作用
- 管理namenode和datanode的I/O
- 存储namenode告知的块位置
- 如果遇到与datanode通信错误,则连接另一个最近的有副本的datanode,并记录故障节点,不再与它通信
校验datanode块的完整性,如果有问题,告诉namenode,并从其他节点读取副本
文件写入
客户端使用DistributedFileSystem对象调用create()创建一个新文件
- 对namenode进行RPC调用在文件系统命名空间新建文件,namenode执行检查确认文件不存在且客户端有新建文件权限
- DistributedFileSystem向客户端返回一个FSDataOutputStream对象,FSDataOutputStream封装了一个DFSOutputStream对象,负责与namenode和datanode通信
写入数据时
DFSOutputStream将数据分为数据包,写入“数据队列”(data queue)
DataStreamer挑选出适合存储副本的一组datanode,并向namenode请求分配新的数据块
存储副本的datanode形成管线,DataStreamer将数据包流式传输到第一个datanode,这个datanode通过管线将数据包传递给下一个datanode,依次到结束
DFSOutputStream维护确认队列(ack queue),收到所有datanode确认信息后,数据包从确认队列删除
- 客户端完成数据写入后,调用FSDataOutputStream的close()方法,将剩余数据包全部写入datanode管线,并等待确认信息,最后告知namenode文件写入完成
如果datanode在写入时发生故障
- 关闭管线,把队列中的所有数据包(共同组成一个块)添加回数据队列的最前端,确保下游的datanode不漏掉任何一个数据包
- 将存储了正确数据块信息的datanode添加标识,把标识传给namenode,故障节点在恢复后删除存储的部分数据块,在管线添加一个新的datanode,替换故障datanode
九、HDFS 一致模型和distcp
- 一致模型
新建文件后,能在文件系统的命名空间立即可见
写入文件的内容不能保证立即可见,因为正在写入的块对reader不可见,会不一致
hflush():FSDataInputStream的方法,HDFS保证目前写入的数据都到达datanode的写入管线并对所有reader可见,确保数据在内存中,不确保在已经写入磁盘;HDFS的close()方法已经包含执行hflush()
hsync():确保数据已经写入磁盘,提交文件描述符的缓冲数据
重要性:
如果不使用一致模型,系统故障时可能会丢失数据块。
一致模型提供了鲁棒性,但增加HDFS负载,吞吐量受到影响,所以要根据实际找到合适的调用频率
- distcp
Hadoop自带程序,并行地将数据复制进出Hadoop,作为一个MR作业实现,并行运行map,没有reducer,-m可以修改运行map的数量
hadoop distcp file1 file2
hadoop disctp dir1 dir2
//如果dir2不存在,则新建dir2
//如果dir2存在,则形成dir2/dir1
//可以指定多个来源dir
hadoop distcp -update dir1 dir2
hadoop distcp -overwrite dir1 dir2
HDFS集群间传送数据
hdfs webhdfs HttpFs
hadoop distcp -update -delete -p hdfs://namenode1/foo hdfs://namenode2/foo
hadoop distcp webhdfs://namenode1:50070/foo webhdfs://namenode2:50070/foo
保持HDFS集群均衡
复制数据可能影响集群的均衡性,最好先使用默认的每个节点20个map运行distcp运行,还可以使用之后介绍的均衡器工具改善均匀程度
例如-m设置为1,则首先选择运行map的当前节点存储副本,其他副本分散在集群中,这导致运行节点总会有副本,不均衡
还没有评论,来说两句吧...