Hive使用手册

从零开始学Hive

Posted by Lyon Ling on July 12, 2019

[TOC]

1. Hive & HiveQL

  • Hive 是 Hadoop 生态系统中基于HDFS分布式文件系统构建的分布式数据仓库
  • HiveQL 是针对Hive的数据库查询语言,其中封装了MapReduce的查询方法,和传统数据库的差别是:
    • Hive是数据仓库,只支持增(insert)查(select),而不支持删(delete)改(update)
    • 增加分区的概念,即分区是物理上的“文件夹”,也是逻辑上的“伪列”
    • 提供用户自定义函数(UDF)的功能,便于开发
    • 提供视图(view)的概念(和正常数据仓库类似),只能由于查找,不能做插入

2. 使用模式

2.1 交互模式

1
2
# 在终端直接输入hive会进入hive shell直接进行交互,记得每句指令后要加分号;结尾
$ hive

2.2 终端模式

1
2
3
# -v visualize, 输出执行过程到终端
# -e execute, 执行指定HQL语句
$ hive -v -e "select * from dbName.tbName limit 10"

3. 初始设置

以下命令都在Hive shell下执行

3.1 资源队列

1
2
3
# 在大型集群中,对于不同的项目和任务都会分配指定计算机资源到专属队列中
# 所以在使用Hive时可能需要指定特定资源队列
> SET mapred.job.queue.name=queueName;

3.2 指定数据库

1
> USE databaseName;

3.3 加载UDF

UDF全程User defined function,指用户基于Java开发的自定义函数。使用前需要将相互依赖的UDF.jar包一次性载入,否则可能会加载失败。

3.3.1 动态加载

1
2
3
> ADD JAR hdfs://nameNode/path/a.jar hdfs://nameNode/path/b.jar;
> CREATE TEMPORARY FUNCTION udfFunc1 as 'com.abc.udf1';
> CREATE TEMPORARY FUNCTION udfFunc2 as 'com.abc.udf2';

这种方式的好处是比较灵活,问题在于加载的UDF只存在于当前Session中,断开对话就消失了,下次使用还需要重新加载。

3.3.2 修改hive-site.xml文件

另一种方法是,修改参数hive.aux.jars.path的值指向UDF文件所在的路径,该参数需要手动添加到hive-site.xml文件中。

1
2
3
4
<property>
<name>hive.aux.jars.path</name>
<value>file:///jarpath/all_new1.jar,file:///jarpath/all_new2.jar</value>
</property>

3.3.3 创建auxlib目录

比较简答的一种方法是,在${HIVE_HOME}下创建auxlib目录,将UDF文件放到该目录中,这样hive在启动时会将其中的jar文件加载到classpath中。

4. 表操作

4.1 DDL 数据定义语言

4.1.1 create 创建

下面有几种常见造表操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
-- 造无分区表
CREATE TABLE IF NOT EXISTS test_table(
  id int,
  name string,
  age int,
  tel string
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY "\001" 	-- 列分隔符
LINES TERMINATED BY "\n" 			-- 行分隔符
ROW FORMAT SERDE "org.apache.hadoop.hive.ql.io.orc.OrcSerde"		-- 序列化格式
STORED AS
INPUTFORMAT "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"		-- input格式
OUTPUTFORMAT "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"; -- output格式

-- 造分区表
CREATE TABLE IF NOT EXISTS test_table(
  id int,
  name string,
  age int,
  tel string
) 
PARTITIONED BY (y string, m string, d string)
ROW FORMAT DELIMITED
STORED AS TEXTFILE; 		-- 内置基本文件格式

-- 复制空表结构
CREATE TABLE IF NOT EXISTS newTable LIKE oldTable;

-- 通过已有表和数据造表并插入空值
CREATE TABLE IF NOT EXISTS newTable AS SELECT key, features FROM oldTable WHERE y='2019';

从已有表生成create-table-as-select(CTAS)

  • CTAS创建的表是原子性的,这意味着,只有当该表所有查询完成后,用户才能看到完整的结果表。
  • CTAS唯一的限制是目标表:即目标表不能是有个有分区的表,也不能是外部表。

三种内置存储格式

  • TEXTFILE 是最普通的文件存储格式,内容可以直接查看。
  • SEQUENCENFILE 基于行存储,将表中的数据以二进制格式编码,并且支持数据压缩,可以节省存储空间。
  • RCFILE 基于列存储,将表中的数据以二进制格式编码,并且支持压缩。 因为列值重复多,所以压缩效率高。占用磁盘存储空间小,io压力小。对包含大量字段的表而言,更适合RCFILE存储。

4.1.2 drop 删除

常见删表操作

1
DROP TABLE IF EXISTS testTable;

drop 或者 insert overwrite 的表会暂时被移动到HDFS的 hdfs://namenode/tmp/username/taskid/ 路径下以供恢复。

4.1.3 alter 更改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- 增加分区
ALTER TABLE testTable ADD PARTITION (y='2017') PARTITION (y='2016');

-- 增加分区,多个分区需要指定前后顺序
ALTER TABLE testTable ADD PARTITION (y='2017', m='03') location '/y=2017/m=03';

-- 删除分区
ALTER TABLE testTable DROP PARTITION (y='2017') PARTITION (y='2016');

-- 表重命名
ALTER TABLE old_table RENAME TO new_table;

-- 添加列
ALTER TABLE testTable ADD COLUMNS (extra_col1 int COMMENT 'New Col1',
                                   extra_col2 int COMMENT 'New Col2');                                
-- 替换全部列
ALTER TABLE testTable REPLACE COLUMNS (new_col1 int COMMENT 'New Col1',
                                       new_col2 int COMMENT 'New Col2');

4.1.4 load 装载数据

load从外部文件载入数据并转化为Hive的分布式数据文件。

1
2
3
4
5
6
7
8
-- 往无分区表装载并覆盖
LOAD DATA INPATH 'path/data.csv' OVERWRITE INTO TABLE test_table;

-- 往指定分区装载并覆盖
LOAD DATA INPATH 'path/data.csv' OVERWRITE INTO TABLE test_table PARTITION (y='2019');

-- 往指定分区装载并追加
LOAD DATA INPATH 'path/data.csv' INTO TABLE test_table PARTITION (y='2019');

4.2 DML 数据操作语言

4.2.1 select 查询

1
2
3
4
5
6
7
8
9
10
11
-- 例子:统计不同年龄/性别组的人数
SELECT b.age, c.sex, count(*) FROM 
(SELECT id FROM table_1) a
JOIN 
(SELECT id, employee_id, age FROM table_2) b
ON a.id = b.id
JOIN
(SELECT employee_id, sex FROM table_3) c
ON b.employee_id=c.employee_id
GROUP BY b.age, c.sex
ORDER BY b.age, c.sex DESC;

select 嵌套查询会影响性能,有时候可以提速,有时候会降速,取决于mapper-reducer的分配机制。

查询关键词
  • distinct 去重 distinct 对结果集去重,一般会启用MapReduce并行去重。distinct 一般会对全部选择字段去重,并不能针对其中部分字段去重。

    count distinct 去重统计会将reducer的数量强制限制为1,因而应该写为子查询提高效率。

    1
    2
    3
    4
    5
    6
    
    -- 罗列不同性别和年龄的组合
    SELECT DISTINCT sex, age FROM test_table;
      
    -- 使用子查询优化,罗列不同性别和年龄的组合
    SELECT COUNT(*) FROM
    (SELECT DISTINCT sex, age FROM test_table) tb;
    
  • group by 分组 group by条件可以使字段也可以是函数结果,会将数据分组再在每组内分别执行聚合函数。 group by依赖的字段必须是已有的或者在子查询内的,不能是在同一层查询的字段别名。 group by不能和distinc同时使用。

    1
    2
    3
    4
    5
    
    -- 按性别、年龄分组并统计人数
    SELECT sex, age COUNT(*) FROM test_table GROUP BY sex, age;
      
    -- 随机分组并统计平均年龄
    SELECT AVG(age) FROM test_table GROUP BY CAST(rand()*10 AS int);
    
  • order bysort by 排序 order by会对输出做全局排序,因此会触发一个reducer,当输出规模较大是,需要较长计算时间和存储空间。 sort by不是全局排序,其在数据进入reducer之前完成排序。可以通过set mapred.reduce.tasks=<numer>设置reducer个数。结果集在单个reducer内有序但对全局未必有序,所以常辅以order by优化性能。

    1
    2
    3
    4
    5
    6
    7
    8
    
    -- 按年龄全局倒序排序曲最年迈的10个人
    SELECT id, age FROM test_table GROUP BY age DESC LIMIT 10;
      
    -- 通过sort by预筛选而优化order by性能,同时避免数据过大而无法order by
    SET mapred.reduce.tasks=10;
    SELECT id, age FROM 
    SELECT id, age FROM test_table SORT BY age DESC LIMIT 10*10) t
    ORDER BY age DESC LIMIT 10;
    
  • limit 限制 limit 结果集的顺序是随机的,和存储文件的顺序有关。 limit 的执行顺序亚于order by

4.2.2 insert 插入

insert的数据源是Hive内已经存在的表。

insert into 是在文件末尾追加数据,而insert overwrite是覆盖全部数据文件。

讲一个表的数据拆分到多个分区需要注意合理使用insert语法,避免多次扫描全表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- 增加数据
INSERT INTO TABLE test_table SELECT * FROM source_table;

-- 覆盖原分区的数据
INSERT OVERWRITE TABLE test_table
PARTITION (y='2017')
SELECT * FROM source_table;

-- 一次扫描多路输出
FROM source_table st
INSERT OVERWRITE TABLE target_table
		PARTITION (y='2017' AND m='03')
		SELECT * WHERE st.y='2017' AND st.m='03'
INSERT OVERWRITE TABLE target_table
		PARTITION (y='2017' AND m='04')
		SELECT * WHERE st.y='2017' AND st.m='04'
INSERT OVERWRITE TABLE target_table
		PARTITION (y='2017' AND m='05')
		SELECT * WHERE st.y='2017' AND st.m='05';

4.2.3 join 连接

性能优化
  • Hive的 join 只支持但条件或多条件的等值 = 关联,不支持 or 关联。
  • join 的结果集不能有重名字段。
  • reducer 会缓存 join 序列中除了最后一个表的所有表的记录,再通过最后一个表将结果序列化到文件系统,因此实践中应该把最大的那个表写在最后,否则会因为缓存浪费大量内存。
  • 如果两张大宽表都有大量字段,但结果集仅保留少量字段,则建议用子查询优先筛选字段,避免占用太多缓存甚至是OOM任务失败。
1
2
3
4
5
6
7
8
9
-- 通过子查询过滤字段
SELECT a.id, b.age, c.sex FROM
(SELECT id FROM table_1) a
JOIN
(SELECT idemployee_idage FROM table_2) b
ON a.id = b.id
JOIN
(SELECT employee_id, sex FROM table_3) c
ON b.employee_id = c.employee_id;
  • 如果所有表之间没有公共关联键,则会触发多个MR stage依次执行,如果所有表之间存在同一公共关联键,则用最大表为左表可以让MR stage并行执行。
1
2
3
4
5
6
7
8
9
-- Parallel Join
SELECT a.id, b.age, c.sex FROM
(SELECT id FROM table_1) a
JOIN
(SELECT idage FROM table_2) b
ON a.id = b.id
JOIN
(SELECT id, sex FROM table_3) c
ON a.id = c.id;
  • join 之前需要确保关联键是否去重,是不是刻意保留非去重结果,要警惕数据倾斜的发生。
数据倾斜

如果两张 10^8^ 数量级的表未对关联键去重,且每张表有10%的数据的关联键为空,则此部分脏数据就会形成10^14^ 数据量的文件,而导致Reducer阶段始终卡在99%。

1
2
3
4
5
6
-- 对关联键进行非NULL非空过滤,避免数据倾斜
SELECT a.id, a.age, b.sex FROM
(SELECT distinct id, age FROM table_1 WHERE id IS NOT NULL AND id <> '') a
JOIN
(SELECT distinct id, age FROM table_2 WHERE id IS NOT NULL AND id <> '') b
ON a.id = b.id;
3种类型的 join
  • join : Hive 的 join 默认是 inner join , 找出左右都可匹配的记录。
  • left join : 左连接,以左边表为准,逐条去右边表找可匹配的字段,如果有多条会依次列出。
  • full outer join : 包含两个表的连接结果,左表缺失或右表缺失的数据会填充NULL。
1
2
3
4
5
6
-- 左右两表连接取并集的sql模板
SELECT nvl(a.id, b.id) AS id, a.age, b.sex FROM
(SELECT id, age FROM table1) a
FULL OUTER JOIN
(SELECT id, sex FROM table2) b
ON a.id = b.id;

4.2.4 union 合并

unionunion all 均基于列合并多张表的数据,所合并的列格式必须完全一致。

union 过程中会去重并降低效率,而 union all 直接追加数据

union 前后是两段 select 语句而非结果集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-- 不去重合并两张表的数据
CREATE TABLE IF NOT EXISTS union_table as
SELECT * FROM
(
    SELECT * FROM table_1
    UNION ALL
    SELECT * FROM table_2
)t;

-- 针对两个限制行数的子表进行union需要额外嵌套select
CREATE TABLE IF NOT EXISTS union_table as
SELECT * FROM
(
    SELECT * FROM (SELECT * FROM table_1 LIMIT 100) tb1
    UNION ALL
    SELECT * FROM (SELECT * FROM table_2 LIMIT 200) tb2
)

4.2.5 导出数据

导出数据可以通过命令行模式或者交互模式实现

1
2
3
4
5
6
-- 交互模式
INSERT OVERWRITE LOCAL DIRECTORY 'path/data.csv'
SELECT * FROM test_table LIMIT 100;

-- 命令行模式
hive -v -e "SELECT * FROM test_table LIMIT 100;" >path/data.csv

(*)命令行模式中,会出现secureCRT中hive的显示错误(缺少空格,tab,数据黏连,数据缺少等),交互模式可以避免该问题。

4.3 常用函数汇总

4.3.1 !=/<>/= 比较

针对string类型的比较适合用 <> 不等与 = 相等, 针对数值类型的比较适合用 !==

判断是否是NULL适合直接用 is nullis not null

1
2
3
4
5
-- 过滤脏姓名数据
SELECT * FROM table1 WHERE name <> '' AND name IS NOT NULL;

-- 过滤脏年龄数据
SELECT * FROM table2 WHERE age > 0.0 AND age IS NOT NULL;

4.3.2 字符串函数

基本函数
  • concat(str1, str2) 进行字符串合并。
  • length(str) 返回字符串长度。
  • substr(str,0,12) 截取字符串从0位开始的12个字符。
匹配与正则函数
  • like 做普通文本匹配, rlike 做正则匹配,且在HIve2上不在支持针对int类型的匹配。
  • regexp_replace 做正则替换, regexp_extract 做正则匹配。
1
2
3
4
5
6
-- 获取基金有关数据
SELECT * FROM test_table WHERE product_type LIKE "%基金%"

-- 将"2017-04-01 00:00:00"格式的时间先提取年月日再去除"-"符号,最后转为int
-- 0对应完整匹配结果,1/2/3则对应第1/2/3和()内的匹配结果
SELECT CAST(regexp_replace(regexp_extract(reg_time,"([0-9]+)-([0-9]+)-([0-9]+)",0),"-","") AS int) FROM test_table;

4.3.3 count/sum/avg/max/min 聚合函数

基本使用

聚合函数在同一层查询中可以组合使用,但不可能互相嵌套使用。

如果需要实现诸如max(avg(feature))的效果,则需要使用嵌套的 select

1
2
3
4
5
6
7
8
-- 简单统计年龄分布
SELECT COUNT(*), AVG(age) as avg_age, MAX(age) as max_age FROM test_table;

-- 统计不同性别的年龄分布
SELECT sex, COUNT(*), AVG(age) as avg_age, MAX(age) as max_age FROM test_table GROUP BY sex

-- 结合case when使用统计有效数据量
SELECT COUNT(CASE WHEN income IS NOT NULL 1 ELSE NULL END) FROM income_record;
countsum 的区别

sum 针对所有数值求和, count 将所有非NULL值记为1后统计。

1
2
-- 针对Value字段的有值率统计
SELECT COUNT(IF(value>0.0,1,NULL))/COUNT(*) FROM test_table;

4.3.4 case 条件函数

case 函数通常格式为 (case when condition1 then value1 else null end) , 其中else 默认条件可省,end 关键词必不可省。

case when 中不能包含聚合函数,否则会报错。

1
2
3
4
5
6
7
8
9
10
11
12
-- 收入区间分组
SELECT id,
(CASE
 WHEN CAST(income as float) < 50000 THEN '0~5万'
 WHEN CAST(income as float) >= 50000 AND CAST(income as float) < 1000000 THEN '5~10万'
 WHEN CAST(income as float) >= 100000 AND CAST(income as float) < 1500000 THEN '10~15万'
 WHEN CAST(income as float) >= 150000 AND CAST(income as float) < 2000000 THEN '15~20万'
 WHEN CAST(income as float) >= 200000 AND CAST(income as float) < 2500000 THEN '20~25万'
 WHEN CAST(income as float) >= 250000 AND CAST(income as float) < 3000000 THEN '25~30万'
 WHEN CAST(income as float) > 300000 THEN '30万以上'
 ELSE NULL END)
 FROM test_table;

4.3.5 if 条件函数

if 函数是简化版的 case 函数,且可以通过嵌套实现多分类。

1
2
-- 将年龄分为四段
SELECT id, IF(age>50, IF(age>75,"晚年","中老年"),IF(age>25,"青壮年","青少年")) FROM test_table;

4.3.6 in 范围函数

HIve不支持在 in 函数嵌套 select 语句。

1
2
-- 提取已知性别的数据
SELECT * FROM test_table WHERE sex IN("M","F");

4.3.7 where 过滤条件

where 过滤条件针对 group by 前的结果集筛选, 优先级高于 group by

1
2
3
4
-- 统计不同公司的男性平均年龄
SELECT company, AVG(age) FROM test_table
WHERE sex = "M"
GROUP BY company;

4.3.8 having 过滤条件

having 过滤条件针对 group by 后的结果集筛选,优先级低于 group by

1
2
3
4
5
-- 统计不同公司的男性平均年龄,且仅保留平均年龄在50岁以上的公司
SELECT company, AVG(age) FROM test_table
WHERE sex = "M"
GROUP BY company
HAVING AVGA(age) > 50;

4.3.9 nvl 空值填充函数

NVL(string1, replace_with) 用法,如果string1为NULL,则nvl 函数返回replace_with的值,否则返回string1的值。

1
2
-- 无收入的客户默认值为0
SELECT customer_id, NVL(salary,0) AS salary FROM test_table;

4.3.10 cast 类型转换

cast as 常用于string/int/double之间的类型转换

1
2
-- 把string类型保存的数字还原为int
SELECT CAST(value_string as INT) FROM test_table;

4.3.11 row_number 编号函数

1
2
3
4
5
-- 按照c2字段倒叙编号
SELECT *, row_number() OVER(ORDER BY c2 DESC) AS row_num FROM test_table;

-- 按照c1字段分组后再按c2字段倒叙编号
SELECT *, row_number() OVER(PARTITION BY c1 ORDER BY c2 DESC) AS row_num FROM test_table;

4.3.12 percentile 百分位函数

如果只查询一个分位, 则 percentile 的返回结果是数值, 如果查询多个分位, 则结果是string类型的array.

如果不加 where 条件限定, 则 percentile 的计算过程包含0值, 往往会造成结果偏移.

1
2
3
4
5
-- 获取income字段的10个分位值
SELECT percentile(CAST(income AS int), ARRAY(0.0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1.0)) AS income_percentiles FROM test_table;

-- 获取income段字Top10%的阈值
SELECT percentile(CAST(income AS int),0.9)) AS income_top10p_threshold FROM test_table;

4.3.13 to_date 时间函数

to_date() 时间函数可以把时间字符串转换成时间类型, 再计算时间差

常用日期提取函数包括 year(dstr)/month(dstr)/day(dstr)/hour(dstr)/minute(dstr)/second(dstr)

日期运算函数包括 datediff(enddate,startdate)/date_sub(startdate,days)/date_add(startdate,days)

1
2
3
4
5
-- 转换时间数据的格式
SELECT to_date("1970-01-01 00:00")SELECT to_date("1970-01-01 00:00:00") AS start_time FROM test_table;

-- 计算所有数据到当前时间的天数差
SELECT datediff(to_date(from_unixtime(unix_timestamp())),record_date) from test_table;

4.3.14 split字符串分隔函数

split(str,regex) 函数用于将string类型数据按regex提取, 分隔后转换为array.

数据拆分为多个列.

1
2
3
4
5
6
-- 以","为分隔符分隔字符串, 并转化为array
SELECT split("1,2,3", ",") AS value_array FROM test_table;

-- 结合array index, 将原始字符串分割成3列
SELECT value_array[0] AS value1, value_array[1] AS value2, value_array[2] AS value3 FROM
(SELECT split("1,2,3", ",") AS value_array FROM test_table)t;

4.3.15 explode 转置函数

explode 转置单列使用时可以把单列数据拆分成多行.

explode 转置单列并保留其他列时需要结合 lateral view 使用.

1
2
3
4
5
6
-- array(1,2,3)变成了3列
SELECT explode(ARRAY(1,2,3)) FROM test_table;

-- 保留主键列, 并将存储array(1,2,3)的array_col列转置
SELECT id, exploded_col FROM source_table
LATERAL VIEW explode(array_col) view_alias AS exploded_col;

4.3.16 with cube 分析功能

with cube 功能实现了笛卡尔积运算

如果对(a,b,c)字段组进行 group by with cube, 则相当于 (a,b,c),(a,b),(a,c),(b,c),(a),(b),(c),() 这8种情况分别聚合统计.

1
2
3
-- 统计a,b,c这仨个字段在所有组合下的记录数
SELECT a,b,c,COUNT(*) FROM test_table
GROUP BY a,b,c WITH CUBE;

5. 其他操作

5.1 数据迁移

  • 在新路径下创建一张相同结构的表,字段结构、serde formatstored as 类型必须完全一致

    create table migrated_table(
      id string,
      number int,
      key_double double,
      key_string string
    ) row format delimited fields terminated by '\t';
    

    这种方式一般属于其他操作无法正常使用下的方法,当表过于复杂容易出错。

  • 使用 distcp 指令进行迁移

    1
    2
    3
    
    # -skipcrccheck 规避跨Hadoop版本迁移的数据检验
    # hftp方式适用于跨集群迁移,hdfs适合集群内迁移
    $ hadoop distcp -Dmapred.job.name=queueName -update -skipcrccheck hftp://domain:ip/user/sourceDestination user/targetDestination
    
  • 如果迁移的是分区表,则需要预先用 alter table 指令造好分区文件夹,如果分区过多,则可以用 distcp 分批复制分区数据

5.2 数据恢复

首先要明白几点,被 truncate 的数据是无法恢复的,但是被 drop 或者 insert overwrite 的表第一时间是可以恢复的。

恢复的具体操作如下:

  • 建立一张相同格式的表 newTable, 存储格式和字段类型必须完全一致。

  • tmp 目录中把数据复制出来:

    1
    
    $ hadoop fs -cp /tmp/username/stageing_hive_YYYY-MM-DD_id/* path/databaseName.db/newTable/*
    

5.3 查询表更新情况

  • 针对分区表可以直接查询最新分区来判断数据是否正确更新。

  • 针对无分区表desc formatted 的结果往往不准确,所以要用Hadoop的指令去查询数据文件的更新时间。

    1
    
    $ hadoop fs -ls path/databaseName.db/tableName/