大数据组件-Hive
1.Hive概念Hive 是基于 Hadoop 的数据仓库工具用于处理大规模结构化数据。它将 SQL 查询转换为 MapReduce 任务简化了分布式计算操作。Hive 适合离线批处理场景支持数据存储、查询和分析。2.Hive核心特性类 SQL 语法HQL支持类似 SQL 的查询语言降低学习成本。数据存储数据以表形式存储在 HDFS 中支持文本、Parquet、ORC 等格式。元数据管理通过元存储Metastore管理表结构、分区等信息。扩展性支持自定义函数UDF、SerDe序列化/反序列化等扩展功能。3.Hive 的架构组件Driver接收 HQL 查询生成执行计划并协调任务执行。Compiler解析 HQL优化逻辑计划并转换为物理计划如 MapReduce。Metastore存储表定义、分区等元数据通常使用关系型数据库如 MySQL。Execution Engine默认使用 MapReduce也可配置为 Tez 或 Spark。4.Hive 的数据模型表Table逻辑概念数据存储在 HDFS 的目录中。分区Partition按列值划分数据加速查询如按日期分区。分桶Bucket对数据哈希分片优化 JOIN 和采样效率。5.Hive的局限性延迟高批处理模式不适合实时查询。事务支持有限早期版本不支持 ACID新版本通过 Hive 3.x 改进。6.Hive内部表和外部表内部表MANAGED_TABLEHive 全权管理数据和元数据外部表EXTERNAL_TABLEHive 只管理元数据数据由 HDFS 管理7.Hive元数据存储默认有自己的数据开Derby但是生产中常用mysqlhive存储的元数据是描述数据信息的数据比如:数据库、表名字段名、字段类型表是内部表还是外部表分区信息dt20260401 这些数据在 HDFS 上的存储路径分桶信息、存储格式ORC/ParquetSerDe 信息8.Hive的分区和分桶分区概念分区是一种将大型表的数据根据特定维度通常是日期、地区、类别等列物理划分到不同子目录中的技术。目的优化查询性能 当查询语句的WHERE子句包含分区键时Hive 可以只扫描相关分区的目录避免全表扫描显著减少 I/O 和数据量。这称为分区裁剪。数据管理 便于按分区加载、删除或归档数据。-- 创建分区表 CREATE TABLE logs ( id INT, user_id STRING, event_type STRING ) PARTITIONED BY (event_date STRING, region STRING) STORED AS ORC;注意事项避免分区过多如按天分区持续多年可能导致成千上万分区过多的小文件会降低 NameNode 性能和查询效率。分区键应选择常用于WHERE过滤的列且基数不宜过高。分桶概念分桶是将一个表或分区内的数据进一步划分为固定数量的文件桶。划分规则基于对分桶键一列或多列进行哈希取模计算。例如一个分区内按user_id分成 4 个桶Hive 会根据hash(user_id) % 4的结果将数据放入对应的桶文件。目的高效采样可以快速对特定桶进行随机采样用于数据预览或测试。优化 JOIN如果两个表都以相同方式对 JOIN 键分桶且桶数量相同或成倍数关系Hive 可以执行高效的桶映射 JOIN。只需将对应桶的数据进行 JOIN无需对整个表进行 Shuffle 和 Reduce。优化聚合对于GROUP BY分桶键的查询可以更快完成。更均匀的数据分布有助于避免数据倾斜。-- 创建分桶表 (通常先分区再分桶) CREATE TABLE user_actions ( user_id INT, action STRING, timestamp BIGINT ) PARTITIONED BY (event_date STRING) CLUSTERED BY (user_id) INTO 4 BUCKETS STORED AS ORC;分区 vs 分桶 总结9.Hivesql的执行流程解析与编译首先SQL 语句会被 Client 提交到 Driver。Driver 内部的解析器 (Parser) 会将 SQL 转换成抽象语法树 (AST)。然后编译器 (Compiler)会去连接 Metastore获取 user_log 表的元数据进行语义分析并将 AST 编译成一个由逻辑操作符组成的 DAG有向无环图也就是逻辑执行计划。优化接着优化器 (Optimizer) 会对这个逻辑计划进行优化。针对写的 WHERE dt2023-11-25 这个条件优化器会执行分区裁剪 (Partition Pruning)确保计算任务只去扫描 dt2023-11-25 这个分区目录下的文件极大地减少了 IO。优化后会生成物理执行计划。提交与执行执行器 (Executor) 将物理计划一系列的 MapReduce 或 Spark 任务提交给 YARN 的 ResourceManager (RM)。资源分配与计算RM 会在集群中找一个 NodeManager (NM) 启动 ApplicationMaster (AM)。AM 负责向 RM 申请计算所需的资源Container然后在这些 Container 里启动 Map 和 Reduce Task。这些 Task 会去 HDFS 上读取指定分区下的数据文件进行计算。返回结果所有 Task 计算完成后最终的结果会汇总由 Driver 获取并返回到我的客户端屏幕上。”10.Hive的相关调优切记hive只是一个工具提供类似sql的操作方式本质是运行MR程序因此调优是在优化MR,而不是把mysql的优化拿过来。主要从四个方面来说hive的建表设计Hivesql本身的优化Hive配置参数 底层引擎MR方面的调整调优注意事项对于大数据计算引擎来说数据量大不是问题数据倾斜是个问题。Hive 的复杂 HQL 底层会转换成多个 MapReduce Job 并行或者串行执行Job 数比较多的作业运行效率相对比较低比如即使只有几百行数据的表如果多次关联多次汇总产生十几个 Job耗时很长。原因是 MapReduce 作业初始化的时间是比较长的。在进行 Hive 大数据分析时常见的聚合操作比如 sum, count, max, min, UDAF 等不怕数据倾斜问题MapReduce 在 Mapper 阶段的预聚合操作使数据倾斜不成问题。好的建表设计模型设计事半功倍。设置合理的 MapReduce 的 Task 并行度能有效提升性能。比如10w 数据量级别的计算用 100 个reduceTask那是相当的浪费1 个足够但是如果是亿级别的数据量那么 1 个 Task 又显得捉襟见肘。了解数据分布自己动手解决数据倾斜问题是个不错的选择。这是通用的算法优化但算法优化有时不能适应特定业务背景开发人员了解业务了解数据可以通过业务逻辑精确有效的解决数据倾斜问题。数据量较大的情况下慎用 count (distinct)group by 容易产生倾斜问题。对小文件进行合并是行之有效的提高调度效率的方法假如所有的作业设置合理的文件数对任务的整体调度效率也会产生积极的正向影响。优化时把握整体单个作业最优不如整体最优。优化 MySQL 的 SQL 优化技巧是不适用在 Hive 的。1hive的建表设计层面①利用分区表优化当一个hive表查询的大多数情况下都需要根据某个字段进行筛选时那么非常时候将该字段作为分区字段进行分区②利用分桶表优化分桶表最大的作用就是帮助我们提高join查询以及采样。select a.*, b.* from a join b on a.id b.id;解决方案:两张表都得按照这个连接字段来进行分桶分桶数量要成倍数关系③选择合适的存储格式原始数据一般来说都是普通文本格式计算的结果数据如果这种结果数据通常还要作为另外一种计算的输入那么可以让当前这个结果数据的格式易于被读取序列化文件格式可用如果这种结果数据就是最终的结果了那么看数据量的大小关系如果比较大可以考虑压缩如果不是特别大直接普通文本格式甚至存储在 RDBMS计算引擎中应用程序在执行的时候中间临时数据执行引擎的特性列式存储 序列化存储文件格式第一种: TextFile存储方式行存储。默认格式如果建表时不指定默认为此格式。每一行都是一条记录每行都以换行符 “\n” 结尾。数据不做压缩时磁盘会开销比较大数据解析开销也比较大。可结合 Gzip、Bzip2 等压缩方式一起使用系统会自动检查查询时会自动解压推荐选用可切分的压缩算法。第二种: Sequence File一种 Hadoop API 提供的二进制文件使用方便、可分割、可压缩的特点。支持三种压缩选择NONE、RECORD、BLOCK。RECORD 压缩率低一般建议使用 BLOCK 压缩。第三种: RC File存储方式数据按行分块每块按照列存储 。A、首先将数据按行分块保证同一个 record 在一个块上避免读一个记录需要读取多个 block。B、其次块数据列式存储有利于数据压缩和快速的列存取。相对来说RCFile 对于提升任务执行性能提升不大但是能节省一些存储空间。可以使用升级版的 ORC 格式。第四种: ORC File存储方式数据按行分块每块按照列存储 。Hive 提供的新格式属于 RCFile 的升级版性能有大幅度提升而且数据可以压缩存储压缩快快速列存取。ORC File 会基于列创建索引当查询的时候会很快。第五种Parquet File存储方式列式存储。Parquet 对于大型查询的类型是高效的。对于扫描特定表格中的特定列查询Parquet 特别有用。Parquet 一般使用 Snappy、Gzip 压缩。默认 Snappy。Parquet 支持 Impala 查询引擎。表的文件存储格式尽量采用 Parquet 或 ORC不仅降低存储量还优化了查询压缩表关联等性能。④选择合适的压缩格式压缩格式是否可拆分是否自带压缩率速度是否 hadoop 自带gzip否是很高比较快是lzo是是比较高很快否要安装snappy否是比较高很快否要安装bzip2是否最高慢是2Hive语法和运行参数1. 查看 Hive 执行计划Hive 的 SQL 语句在执行之前需要将 SQL 语句转换成 MapReduce 任务因此需要了解具体的转换过程可以在 SQL 语句中输入如下命令查看具体的执行计划。查看执行计划添加 extended 关键字可以查看更加详细的执行计划explain [extended] query例子:explain select department, count(*) as total from student where age 18 group by department order by total desc limit 3;关于 Hive 的执行计划中的 Operator 的概念: (逻辑执行计划: Operator Tree)select ... from ... where ... group by ... having ... order by ..... limit .....一条 Hive SQL 写完底层会被拆成一串 “操作步骤”这一串步骤就叫 Operator Tree算子树每个节点例如selectgroup by都是一个Operate2. 列裁剪列裁剪就是在查询时只读取需要的列分区裁剪就是只读取需要的分区。当列很多或者数据量很大时如果 select * 或者不指定分区全列扫描和全表扫描效率都很低。Hive 在读数据的时候可以只读取查询中所需要用到的列而忽略其他的列。这样做可以节省读取开销中间表存储开销和数据整合开销。set hive.optimize.cp true; ## 列裁剪取数只取查询中需要用到的列默认是true3. 谓词下推将 SQL 语句中的 where 谓词逻辑都尽可能提前执行减少下游处理的数据量。对应逻辑优化器是PredicatePushDown。set hive.optimize.ppdtrue; ## 默认是true示例程序select a.*, b.* from a join b on a.id b.id where b.age 20; select a.*, c.* from a join (select * from b where age 20) c on a.id c.id;4. 分区裁剪列裁剪就是在查询时只读取需要的列分区裁剪就是只读取需要的分区。当列很多或者数据量很大时如果select *或者不指定分区全列扫描和全表扫描效率都很低。在查询的过程中只选择需要的分区可以减少读入的分区数目减少读入的数据量。Hive 中与分区裁剪优化相关的规则set hive.optimize.prunertrue; ## 默认是true在 HiveSQL 解析阶段对应的规则是ColumnPruner逻辑优化器。示例select * from student where department AAAA;5. 合并小文件大数据技术组件最怕的就是两个事儿存储组件海量小文件SQL 示例1000000M 100000 × 10M→ 100000 个小文件计算组件数据倾斜SQL 示例1000000M 100000 × 10M→ 100000 个小文件但是其中有两个超级大如果一个 MapReduce Job 碰到一堆小文件作为输入一个小文件会启动一个 Task。在 MR 编程模型中提供了一个叫做CombineFileInputFormat的类具备把一个节点甚至一个机架上的多个小文件划分到同一个输入切片的能力。Hive 的默认InputFormatTextInputFormatMap 输入合并在执行 MapReduce 程序的时候一般情况是一个文件的一个数据分块需要一个mapTask来处理。但是如果数据源是大量的小文件这样就会启动大量的mapTask任务会浪费大量资源。可以将输入的小文件进行合并从而减少mapTask任务数量。Map 端输入、合并文件之后按照 block 的大小分割默认 set hive.input.formatorg.apache.hadoop.hive.ql.io.CombineHiveInputFormat; Map 端输入不合 set hive.input.formatorg.apache.hadoop.hive.ql.io.HiveInputFormat;Map/Reduce 输出合并大量的小文件会给 HDFS 带来压力影响处理效率。可以通过合并 Map 和 Reduce 的结果文件来消除影响。是否合并 Map 输出文件默认值为 true set hive.merge.mapfilestrue; ## 是否合并 Reduce 端输出文件默认值为 false set hive.merge.mapredfilestrue; ## 合并文件的大小默认值为 256000000 256M set hive.merge.size.per.task256000000; ## 每个 Map 最大分割大小 set mapred.max.split.size256000000; ## 一个节点上 split 的最小值 set mapred.min.split.size.per.node1; // 服务器节点 如果有多个节点都是只有一个小文件这种情况当中意味着压根没合并 如果这个节点有 300 个 1M 的文件。指定的输入切片大小是256M 44M 的数据会传送给其他节点做合并 如果这个节点有 500 个 1M 的文件。指定的输入切片大小是256M 244M 的数据会传送给其他节点做合并 ## 一个机架上 split 的最少值 set mapred.min.split.size.per.rack1; // 服务器机架6.合理设置Maptask并行度7.合理设置ReduceTask并行度8.join优化Join 优化整体原则:优先过滤后再进行 Join 操作最大限度的减少参与 join 的数据量where 能用就用小表 join 大表最好启动 mapjoinhive 自动启用 mapjoin小表不能超过 25M可以更改Join on 的条件相同的话最好放入同一个 job并且 join 表的排列顺序从小到大select a.*, b.*, c.* from a join b on a.id b.id join c on a.id c.i如果多张表做 join如果多个链接条件都相同会转换成一个 Job优先过滤数据尽量减少每个阶段的数据量对于分区表要用上分区字段的尽量使用同时只选择后面需要使用到的列最大限度的减少参与 Join 的数据量。小表 join 大表原则小表 join 大表的时应遵守小表 join 大表原则原因是 join 操作的 reduce 阶段位于 join 左边的表内容会被加载进内存将条目少的表放在左边可以有效减少发生内存溢出的几率。join 中执行顺序是从左到右生成 Job应该保证连续查询中的表的大小从左到右是依次增加的。使用相同的连接键在 hive 中当对 3 个或更多张表进行 join 时如果 on 条件使用相同字段那么它们会合并为一个 MapReduce Job利用这种特性可以将相同的 join on 放入一个 job 来节省执行时间。尽量原子操作尽量避免一个 SQL 包含复杂的逻辑可以使用中间表来完成复杂的逻辑。大表 Join 大表空 key 过滤有时 join 超时是因为某些 key 对应的数据太多而相同 key 对应的数据都会发送到相同的 reducer 上从而导致内存不够。此时我们应该仔细分析这些异常的 key很多情况下这些 key 对应的数据是异常数据我们需要在 SQL 语句中进行过滤。空 key 转换有时虽然某个 key 为空对应的数据很多但是相应的数据不是异常数据必须要包含在 join 的结果中此时我们可以表 a 中 key 为空的字段赋一个随机的值使得数据随机均匀地分布到不同的 reducer 上。9.启用 MapJoin关于 Hive 调优中“能用就用” 的原则和方法where —— 能用就用mapjoin —— 资源允许的情况下局部聚合 combiner —— 不改变业务结果的情况下MapJoin 是将 join 双方比较小的表直接分发到各个 map 进程的内存中在 map 进程中进行 join 操作这样就不用进行 reduce 步骤从而提高了速度。只有 join 操作才能启用 MapJoin。# 是否根据输入小表的大小自动将reduce端的common join 转化为map join将小表刷入内存中。 # 对应逻辑优化器是MapJoinProcessor set hive.auto.convert.join true; # 刷入内存表的大小(字节) 25M 2G set hive.mapjoin.smalltable.filesize 25000000; # hive会基于表的size自动的将普通join转换成mapjoin set hive.auto.convert.join.noconditionaltasktrue; # 多大的表可以自动触发放到内层LocalTask中默认大小10M set hive.auto.convert.join.noconditionaltask.size10000000;Hive 可以进行多表 Join。Join 操作尤其是 Join 大表的时候代价是非常大的。MapJoin 特别适合大小表 join 的情况。在 Hive join 场景中一般总有一张相对小的表和一张相对大的表小表叫 build table大表叫 probe table。Hive 在解析带 join 的 SQL 语句时会默认将最后一个表作为 probe table将前面的表作为 build table 并试图将它们读进内存。如果表顺序写反probe table 在前面引发 OOM 的风险就高了。在维度建模数据仓库中事实表就是 probe table维度表就是 build table。这种 Join 方式在 map 端直接完成 join 过程消灭了 reduce效率很高。而且 MapJoin 还支持非等值连接。当 Hive 执行 Join 时需要选择哪个表被流式传输stream哪个表被缓存cache。Hive 将 JOIN 语句中的最后一个表用于流式传输因此我们需要确保这个流表在两者之间是最大的。如果要在不同的 key 上 join 更多的表那么对于每个 join 集只需在 ON 条件右侧指定较大的表。也可以手动开启 mapjoin-- SQL方式在SQL语句中添加MapJoin标记mapjoin hint -- 将小表放到内存中省去shuffle操作 -- 在没有开启mapjoin的情况下执行的是reduceJoin SELECT /* MAPJOIN(smallTable) */ smallTable.key, bigTable.value FROM smallTable JOIN bigTable ON smallTable.key bigTable.key; /*mapjoin(a,b,c)*/ /*mapjoin(a)*/10.Sort-Merge-Bucket(SMB) Map Join它是另一种 Hive Join 优化技术使用这个技术的前提是所有的表都必须是分桶表bucket和分桶排序的sort。分桶表的优化distribute by .... sort by ....具体实现针对参与 join 的这两张做相同的 hash 散列每个桶里面的数据还要排序这两张表的分桶个数要成倍数。开启 SMB join 的开关一些常见参数设置-- 当用户执行bucket map join的时候发现不能执行时禁止查询 set hive.enforce.sortmergebucketmapjoinfalse; -- 如果join的表通过sort merge join的条件join是否会自动转换为sort merge join set hive.auto.convert.sortmerge.jointrue; -- 当两个分桶表 join 时如果 join on的是分桶字段小表的分桶数是大表的倍数时可以启用 mapjoin 来提高效率。 -- bucket map join优化默认值是 false set hive.optimize.bucketmapjoinfalse; -- bucket map join 优化默认值是 false set hive.optimize.bucketmapjoin.sortedmergefalse;