前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据平台:资源管理及存储优化技术

大数据平台:资源管理及存储优化技术

原创
作者头像
Yiwenwu
修改2024-05-09 20:35:54
3570
修改2024-05-09 20:35:54
举报
文章被收录于专栏:大数据&分布式大数据&分布式

背景介绍

大数据平台的资源管理组件主要涉及存储资源计算资源管理两部分,属于大数据平台运维管理系统。基于资源管理系统,大数据平台的开发运维人员能够清晰掌控平台的资源使用情况和资源在不同时间段下的变化趋势,能对资源使用异常进行及时发现并定位处理,避免造成更严重的影响,如磁盘空间撑爆,计算资源无空余,任务长时间等待不运行等造成业务阻塞。

资源管理系统核心目的:对于大数据平台的资源管理,让一切人对机器的操作尽可能自动化,让一切人的决策基于数据,提供如下能力:

  1. 提升可见性:增加大数据平台的存储、计算资源的可见性;
  2. 小文件优化:HDFS小文件管理,减少在存储和运算过程中资源浪费;
  3. 热度分析:存储文件的热度分析,存储格式优化、压缩;重要数据备份;
  4. 生命周期管理:数据生命周期管理,支持存储时效设置,避免数据僵化;
  5. 计算任务诊断:基于计算任务解析,自动给出任务的优化建议;

核心能力

大数据平台的资源管理主要从两个维度出发:存储、计算;以增强和便捷大数据平台的运维能力,包括如下方面:

  • 解决小文件引发的Hadoop系统问题:HDFS是为了存储大文件设计产生的,为增加文件的访问效率,HDFS会将所有的文件元数据信息以内存的形式存储在NameNode节点。若HDFS存储大量的小文件,会造成NameNode的内存飙升,性能下降,成为瓶颈,且易引发频发Full GC;
  • 提供存储资源的生命周期管理:HDFS支持存储大量的大文件,但是随着业务的发展,文件不断堆积,可使用的存储空间不断下降。因此需要对HDFS的存储文件进行生命周期管理,甄别长期不用的文件并支持对过期文件进行删除,从而节省HDFS存储资源;
  • 资源趋势可见性:通过可视化界面和不同的筛选条件获取整个大数据平台的存储、计算资源的使用情况和变化趋势,根据总体和各个租户的使用趋势对资源进行提前规划统筹,若出现资源异常增长和下降,能够及时发现并告警;
  • 支持细粒度成本核算:根据各个租户的资源的使用情况和占比,可以更精确的核算各个租户下的资源使用成本,便于后续成本分摊和预算评估;
  • 资源成本优化:根据资源的使用情况分析,可分别对存储、计算资源进行优化,如根据数据的热度,对存储文件进行压缩或删除;停止Yarn的孤子任务,减少计算成本;分析任务的运行情况,自动给出对应的优化建议;

TBDS资源管理核心功能包括三部分:

  • 拉取待解析数据:基于HDFS NameNode、HDFS Client、History Server、JobHistroy等组件拉取HDFS存储数据和Yarn计算任务数据;
  • 解析数据:基于Spark Job等方式对拉取的数据进行解析分析,如计算目录下总文件、小文件总数等指标,并将计算结果保存对应DB中(MySQL、Phoenix);
  • 运维调度:基于设置的规则周期性或手动触发调度任务,执行对应的运维操作,例如:小文件合并,文件生命周期管理;

优化技术

大数据平台存储优化,主要基于HDFS实现,HDFS整体架构如下所示,属于主从(master-slave)架构,一个HDFS集群一般包括:

  • NameNode:Master主节点,用于管理文件系统命名空间(file system namespace)和外部客户端请求处理,负责管理元数据,管理DataNode等;
  • DataNode:一系列Slave从节点,用于对应节点的数据存储管理,负责数据Block的管理;

1. HDFS分层存储

根据HDFS上存储数据的使用频率,将数据标记为不同的温度,数据温度标记示例如下:

HDFS从Hadoop2.3开始支持分层存储,可以基于不同的数据温度映射到不同的存储层,利用服务器不同类型的存储介质(HDD硬盘,SSD,内存等)提供更多的存储策略。

HDFS支持的分层存储介质,基于温度从低到高包括:

  • ARCHIVE:高存储密度但耗电较少的存储介质,例如磁带,通常用来存储冷数据
  • DISK:磁盘介质,这是HDFS最早支持的存储介质
  • SSD:固态硬盘,是一种新型存储介质,目前被不少互联网公司使用
  • RAM_DISK:数据被写入内存中,同时会往该存储介质中再(异步)写一份

其中HDFS存储策略:

  • Lazy_persist:一个副本保存在内存RAM\_DISK中,其余副本保存在磁盘中;
  • ALL_SSD:所有副本都保存在SSD中;
  • One_SSD:一个副本保存在SSD中,其余副本保存在磁盘中;
  • Hot:所有副本保存在磁盘中,这也是默认的存储策略;
  • Warm:一个副本保存在磁盘上,其余副本保存在归档存储上;
  • Cold:所有副本都保存在归档存储上;

2. HDFS纠删码

在Hadoop 2.x及以前的版本中,HDFS主要依靠数据副本来实现容错,通常会创建三个副本来保证数据可用性。

纠删码(erasure coding,EC):是一种数据保护技术,RAID的延伸,将数据分割为片段,把冗余数据块扩展、编码,并将其存储在不同的节点位置,是分布式存储中热门技术。纠删码基于数学函数来描述对象,以检查对象的准确性,若数据丢失和非准确,可以根据纠删码恢复,常用的纠删码技术:多项式插值(polynomial interpolation)过采样(oversampling)

基本结构:总数据块(n) = 原始数据库(k) + 校验块(m),即n=k+m;

纠删码主要使用两类编码:

1. RS编码

RS(Reed-Solomon)编码,是一种常用的向前纠错(forward error correction, FEC)的纠错编码,是MDS种常用的一类。是一类纠能力很强的多进制BCH码。RS编码涉及三个主要问题:

  • 使用范德蒙矩阵(Vandermonde Matrix) 计算原始数据的校验字;
  • 使用高斯消元法(Gaussian Elimination)从数据错误中恢复原始数据;
  • 在有限域(Galois Fields)上进行快速计算;

2. LRC编码

LRC(locally repairable codes)编码:是基于RS编码改进,可有效减少数据修复时的系统负载,即:在可靠性与RS编码大致相同的情况下,减少恢复损坏数据所需的数据块数量。

  • 块局部性(Block Locality):对于某个纠删码,要对一个数据块编码,最多需要多少其他的数据块;
  • 最小码距(Minimum Code Distance):对于切割为n块的文件,最少损毁多少块数据文件就不可恢复了;

在Hadoop3.0开始引入支持HDFS文件块级别的纠删码底层采用Reed-Solomon(k,m)算法。在不牺牲太多计算性能的情况下,以更小的存储空间提供与传统副本相当的数据冗余能力。

3. HDFS Federation

HDFS Federation为HDFS系统提供了NameNode水平扩容能力,避免NameNode中的存储元数据丢失,造成单点故障。社区最早从Apache Hadoop 0.23.0版本开始引入了HDFS federation。HDFS Federation是指 HDFS集群可同时存在多个NameNode/Namespace,每个Namespace之间是互相独立的;单独的一个Namespace里面包含多个 NameNode,其中一个是主,剩余的是备,这个和上面我们介绍的单Namespace里面的架构是一样的。这些Namespace共同管理整个集群的数据,每个Namespace只管理一部分数据,之间互不影响。

迭代1:ViewFs

基于 HDFS Federation模式,允许一个集群内存在多个Namespace,每个Namespace只管理部分数据。客户端在查询时需要无感知查询数据,而无需关注各Namespace的数据组合。社区引入了视图文件系统(View File System,简称 ViewFs),映射各个Namespace的指定存储路径,类似于某些 Unix / Linux 系统中的客户端挂载表,fs.defaultFS的值已经变成了viewfs://clusterX。

如图:ViewFs分绑定了不同Namespace的路径包括 /data, /project, /user, /tmp.

路径挂载
路径挂载

迭代2:Router-based Federation (RBF)

ViewFs的实现方案,存在几个问题:

  • 升级困难:ViewFs基于客户端实现,若版本变更,客户端全量升级比较困难
  • 手动维护:主要基于配置文件管理路径映射,维护成本高

为解决以上问题,社区从Hadoop 2.9.0(HDFS-10467)开始引入一种基于路由的Federation方案(Router-Based Federation),简称RBF,基于服务端实现。在Client和集群NameNode服务之间新加了一层Router服务,Router接口与NameNode接口保持一致,基于State Store维护 Router挂载路由信息,所有客户端请求由Router转发给下游的NameNode节点。

整体架构
整体架构

备注:基于RBF进行NameNode联邦,由于所有请求需要经过Router转发,Router服务也会存在性能瓶颈

4. 小文件合并

由于Hadoop的Block size一般是64MB,128MB或者256MB,如果文件小于默认值,也会存储占用一个Block存储,而这些明显小于Block大小的HDFS文件称为小文件。

严峻的小文件问题会对Namespace内存管理和计算性能造成影响:

  • 内存激增:Namenode会将文件元数据信息维护在内存中,如果文件增加到10亿级别,则需要差不多300GB内存,导致NameNode压力过大
  • 重启耗时长:若重启NameNode,需要触发元数据重新加载到内存中,导致恢复启动时间较长
  • 大量随机IO:一次大文件的顺序读取性能往往优于大量的小文件随机读取的性能

现有的小文件合并方法主要包括:

  • Hadoop Archive Files:将许多小文件打包归档到更大的HAR文件中来缓解NameNode内存问题;
  • CombineFileInputFormat:是Hadoop提供的抽象类,在MapReduce读取时合并小文件;
  • Hive合并小文件:使用Hive时会“create table as”或“insert overwrite”语句输出了小文件,通过设置参数,Hive可以在SQL作业执行完毕后会单独起一个MapReduce任务来合并输出的小文件;

实现细节

以下将针对小文件合并的实现细节进行说明主要分为三个步骤:

  • 镜像解析:解析NameNode FsImage镜像文件,获取所有的文件目录的元数据信息;
  • 离线分析任务:对于大集群,文件目录可达10亿级别,为完成文件目录分析,识别出小文件,可采用spark离线任务进行分析执行;
  • 小文件合并:基于识别出的小文件信息,根据对应的文件头类型和压缩类型进行合并处理;

镜像解析

FsImage镜像文件是Protobuf编码的, HDFS官方提供多种解析方式,将PB镜像文件解析为易读的文本格式,详情查看,支持的输出方式有:

  • Web:默认输出,启动HTTP服务,提供只读的WebHDFS API,基于API可以遍历HDFS文件信息
  • XML:以XML形式输出,该方式解析后的存储最大,可达到原始FsImage的5.x倍
  • Delimited:一行数据代表一个HDFS文件信息;单行数据以分隔符进行文件属性分割,默认分隔符\t
  • FileDistribution:分析大致的HDFS文件分布

解析命令说明如下:

代码语言:javascript
复制
hdfs oiv

Required command line arguments:
-i,--inputFile <arg>   指定解析的FsImage镜像文件

Optional command line arguments:
-o,--outputFile <arg>  解析后的输出文件,如果文件存在则覆盖
-p,--processor <arg>   选择镜像解析方式 (XML|FileDistribution|Web|Delimited)
                       (Web by default),方式说明详见上文
-delimiter <arg>       Delimited解析方式指定的分隔符
-t,--temp <arg>        Delimited解析时若镜像文件过大,文件的父子关系无法直接在内存执行,指定逻辑磁盘的临时目录

因为对于某些字段是不关注的,如文件RWX权限信息。为减少解析后文件大小,镜像解析可参考Delimited方式实现自定义扩展。解析后的FsImage镜像文件可以上传HDFS便于后续Spark离线任务并发读取镜像文件。

离线分析任务

基于解析后的文件元数据信息,可启动Spark离线任务进行镜像文件的统计计算,并把分析结果按照不同聚合维度持久化到数据库中,包括MySQL(PG)和HBase,若所有的HDFS目录信息都持久化,每天的数据达到1000万以上,传统关系型数据存储压力大,因此采用HBase存储统计后的文件目录信息,HBase数据查询基于Phoenix实现。

小文件合并

实现思路

除了Hadoop系统提供的合并方法,开发者可以通过外置功能来实现小文件合并,以下给出基于Spark自定义任务实现小文件合并的思路:

  1. 解析NameNode镜像文件:FsImage镜像文件持久化HDFS的所有文件元数据信息,保存在NameNode节点下的数据目录下,FsImage文件有前缀fsimage_,基于解析FsImage可以得到全量的NameNode元数据信息
  2. 文件识别:识别指定路径下,HDFS文件的类型与压缩方式
  3. 拼装执行规则:根据不同的文件类型和压缩方式,拼装对应的执行规则,在合并过程中,针对相同文件类型进行合并,而合并前需要将压缩文件先解压后再合并

支持的合并类型:(1). HDFS 文件类型;(2). 分区表类型

小文件合并需要用户主动触发的,系统不会自动执行文件合并,文件合并是个危险操作,合并前操作人员需要确保该目录下文件合并后不影响业务使用,或者合并后需要主动告知业务,文件使用方式变化,即小文件的合并是跟具体的业务使用挂钩的,不能进行随意合并,小文件合并的执行流程如下:

  • 通过界面操作选择小文件合并的方式和合并的目录(表分区)信息;
  • 选择小文件合并基本信息后,会自动识别待合并文件的文件类型压缩格式,并统计待合并总数和存储量;选择 确定 执行合并任务;
  • 后台创建待执行的合并任务,以Action执行提交的Spark离线合并任务;

文件识别

合并前需要识别HDFS文件类型和压缩方式

  • 基于HDFS FileSystem 遍历获取指定目录所有文件列表,若文件超过合并阈值则忽略;获取路径下的所有待合并小文件列表;
  • 基于待合并文件列表,识别文件类型,类型识别基于读取文件获取文件头三个字节,根据文件头类型判断文件类型,如果文件头类型无法匹配,则读取整个文件,判断MimeType是否为文本类型;
  • 基于识别出的文件类型,随机读取待合并文件,获取文件的压缩方式;

文件头(MimeType)与文件类型对应表:

文件头/MimeType

文件类型

text/plain

TEXT File

ORC

ORC File

SEQ

Sequence File

Obj(Objavro)

AVRO File

PAR

PARQUET File

文件后缀名与压缩方式对应表:

文件后缀名

压缩方式

压缩类

.deflate

DEFLATE压缩

org.apache.hadoop.io.compress.DefaultCodec

.gz

GZIP压缩

org.apache.hadoop.io.compress.GzipCodec

.bz2

BZIP2压缩

org.apache.hadoop.io.compress.BZip2Codec

.lzo_deflate

LZO压缩

io.airlift.compress.lzo.LzoCodec

.lz4

LZ4压缩

org.apache.hadoop.io.compress.Lz4Codec

.snappy

SNAPPY压缩

org.apache.hadoop.io.compress.SnappyCodec

.lzo

LZOP压缩

io.airlift.compress.lzo.LzopCodec

.bz2

ZLIB压缩

org.apache.orc.impl.ZlibCodec

其他

不进行压缩

以下操作简述不同的HDFS文件格式的判断与获取:

AVRO文件类型:

代码语言:java
复制
private CodecType readAvroCodec(Path path) throws Exception {
    try (DataFileStream reader = new DataFileStream(fs.open(path), new GenericDatumReader())) {
        Schema schema = reader.getSchema();
        String codec = reader.getMetaString("avro.codec");
        LOG.info("read avro file by header info of schema={}, codec={}", schema, codec);
        return CodecType.getTypeByName(codec, CodecType.NONE);
    }
}

SEQ(Sequence)文件类型:

代码语言:java
复制
private CodecType readSeqCodec(Path path) throws Exception {
    try (SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path))) {
        CompressionCodec compressionCodec = reader.getCompressionCodec();
        LOG.info("read seq file with compression type={}, codec={}", reader.getCompressionType().name(), compressionCodec.getCompressorType().getName());
        return CodecType.getTypeByCodec(compressionCodec.getClass().getName(), CodecType.NONE);
    }
}

ORC文件类型:

代码语言:java
复制
private CodecType readORCCodec(Path path) throws Exception {
    Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
    CompressionKind codec = reader.getCompressionKind();
    LOG.info("read orc file of path={}, codec={}", path, codec);
    return CodecType.getTypeByName(codec.name(), CodecType.NONE);
}

PARQUET文件类型:

代码语言:java
复制
private CodecType readParquetCodec(Path path) throws Exception {
    ParquetMetadata metadata = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER);
    CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
    try {
        codec = metadata.getBlocks().get(0).getColumns().get(0).getCodec();
    } catch (Exception e) {
        LOG.error("get codec info of parquet file error", e);
    }
    return CodecType.getTypeByName(codec.getParquetCompressionCodec().name(), CodecType.NONE);
}

TEXT和其他类型,根据文件后缀名识别

代码语言:java
复制
private CodecType detectCodecTypeByExt(DirPathInfo dirPathInfo) {
    List<Path> randomPaths = this.getRandomPaths(dirPathInfo.getFilePaths());
    CodecType finalCodec = null;
    boolean same = true;
    for (Path path : randomPaths) {
        String extension = FilenameUtils.getExtension(path.getName());
        CodecType codec = CodecType.getTypeByExtension(extension);
        Preconditions.checkNotNull(codec, "[merge.detect_codec_null] can't get dir path info of codecType");
        if (finalCodec == null) {
            finalCodec = codec;
        }
        if (finalCodec.getClass() != codec.getClass()) {
            same = false;
            break;
        }
    }
    Preconditions.checkArgument(same, "[merge.detect_codec_mismatch] file codec type is not same for dir=" + dirPathInfo.getDirPath());
    return finalCodec;
}

文件读写

基于Spark作业执行合并时,需要保证合并前后的文件类型和压缩压缩方式一致。不同文件类型的读写实现如下:

TEXT读写

代码语言:java
复制
spark.read().textFile(dirInfo.getFilePathSeq())
            .coalesce(1).write().option("compression", sparkCodec).text(targetPath);

ORC读写

代码语言:java
复制
spark.conf().set("spark.sql.orc.impl", "native");
spark.read().orc(dirInfo.getFilePathSeq())
            .coalesce(1).write().option("compression", sparkCodec).orc(targetPath);

AVRO读写

代码语言:java
复制
spark.conf().set("spark.hadoop.avro.mapred.ignore.inputs.without.extension", false);
spark.conf().set("spark.sql.avro.compression.codec", sparkCodec);
spark.read().format("com.databricks.spark.avro").load(dirInfo.getFilePathSeq())
            .coalesce(1).write().format("com.databricks.spark.avro").save(targetPath);

PARQUET读写

代码语言:java
复制
spark.conf().set("spark.sql.parquet.binaryAsString", "true");
spark.conf().set("spark.sql.parquet.enableVectorizedReader", "false");
spark.read().parquet(dirInfo.getFilePathSeq())
            .coalesce(1).write().option("compression", sparkCodec).parquet(targetPath);

SEQUENCE读写

代码语言:java
复制
Path randomPath = new FileManager(conf).getRandomPath(dirInfo);
SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(randomPath));
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
if (codec != CodecType.NONE) {
            sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
            sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", codec.getCodec());
            sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
}
sc.sequenceFile(dirInfo.getFilePathNames(), reader.getKeyClass(), reader.getValueClass())
            .map(tuple -> new Tuple2(tuple._1.toString(), tuple._2.toString()))
            .coalesce(1).saveAsObjectFile(targetPath);

总结

资源管理组件是大数据平台中的运维组件,对于普通用户基本不可见,更多是辅助管理员进行运维管理。一个好的资源管理组件,会尽可能的将资源可视化,提供资源优化的运维建议,帮助运维发现资源瓶颈,及时进行资源扩缩容调整。

本文主要概述了资源管理组件的背景及系统核心能力;针对存储资源,介绍了HDFS四个优化技术点:分层存储、纠删码、NameNode Federation和小文件合并;最后,介绍了小文件合并的相关的实现细节,主要包括:镜像解析、离线分析任务、和自定义小文件合并实现。

我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景介绍
  • 核心能力
  • 优化技术
    • 1. HDFS分层存储
      • 2. HDFS纠删码
        • 3. HDFS Federation
          • 4. 小文件合并
          • 实现细节
            • 镜像解析
              • 离线分析任务
                • 小文件合并
                  • 实现思路
                  • 文件识别
                  • 文件读写
              • 总结
              相关产品与服务
              大数据处理套件 TBDS
              腾讯大数据处理套件(Tencent Big Data Suite,TBDS)依托腾讯多年海量数据处理经验,基于云原生技术和泛 Hadoop 生态开源技术提供的可靠、安全、易用的大数据处理平台。 TBDS可在公有云、私有云、非云化环境,根据不同数据处理需求组合合适的存算分析组件,包括 Hive、Spark、HBase、Flink、Presto、Iceberg、Elasticsearch、StarRocks 等,以快速构建企业级数据湖仓。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档


              http://www.vxiaotou.com