[TOC]
1. Hive & HiveQL
- Hive 是 Hadoop 生态系统中基于HDFS分布式文件系统构建的分布式数据仓库
- HiveQL 是针对Hive的数据库查询语言,其中封装了MapReduce的查询方法,和传统数据库的差别是:
- Hive是数据仓库,只支持增(
insert
)查(select
),而不支持删(delete
)改(update
) - 增加分区的概念,即分区是物理上的“文件夹”,也是逻辑上的“伪列”
- 提供用户自定义函数(UDF)的功能,便于开发
- 提供视图(view)的概念(和正常数据仓库类似),只能由于查找,不能做插入
- Hive是数据仓库,只支持增(
2. 使用模式
2.1 交互模式
1
2
# 在终端直接输入hive会进入hive shell直接进行交互,记得每句指令后要加分号;结尾
$ hive
2.2 终端模式
1
2
3
# -v visualize, 输出执行过程到终端
# -e execute, 执行指定HQL语句
$ hive -v -e "select * from dbName.tbName limit 10"
3. 初始设置
以下命令都在Hive shell下执行
3.1 资源队列
1
2
3
# 在大型集群中,对于不同的项目和任务都会分配指定计算机资源到专属队列中
# 所以在使用Hive时可能需要指定特定资源队列
> SET mapred.job.queue.name=queueName;
3.2 指定数据库
1
> USE databaseName;
3.3 加载UDF
UDF全程User defined function,指用户基于Java开发的自定义函数。使用前需要将相互依赖的UDF.jar
包一次性载入,否则可能会加载失败。
3.3.1 动态加载
1
2
3
> ADD JAR hdfs://nameNode/path/a.jar hdfs://nameNode/path/b.jar;
> CREATE TEMPORARY FUNCTION udfFunc1 as 'com.abc.udf1';
> CREATE TEMPORARY FUNCTION udfFunc2 as 'com.abc.udf2';
这种方式的好处是比较灵活,问题在于加载的UDF只存在于当前Session中,断开对话就消失了,下次使用还需要重新加载。
3.3.2 修改hive-site.xml文件
另一种方法是,修改参数hive.aux.jars.path
的值指向UDF文件所在的路径,该参数需要手动添加到hive-site.xml文件中。
1
2
3
4
<property>
<name>hive.aux.jars.path</name>
<value>file:///jarpath/all_new1.jar,file:///jarpath/all_new2.jar</value>
</property>
3.3.3 创建auxlib目录
比较简答的一种方法是,在${HIVE_HOME}
下创建auxlib目录,将UDF文件放到该目录中,这样hive在启动时会将其中的jar文件加载到classpath中。
4. 表操作
4.1 DDL 数据定义语言
4.1.1 create
创建
下面有几种常见造表操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
-- 造无分区表
CREATE TABLE IF NOT EXISTS test_table(
id int,
name string,
age int,
tel string
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY "\001" -- 列分隔符
LINES TERMINATED BY "\n" -- 行分隔符
ROW FORMAT SERDE "org.apache.hadoop.hive.ql.io.orc.OrcSerde" -- 序列化格式
STORED AS
INPUTFORMAT "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat" -- input格式
OUTPUTFORMAT "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"; -- output格式
-- 造分区表
CREATE TABLE IF NOT EXISTS test_table(
id int,
name string,
age int,
tel string
)
PARTITIONED BY (y string, m string, d string)
ROW FORMAT DELIMITED
STORED AS TEXTFILE; -- 内置基本文件格式
-- 复制空表结构
CREATE TABLE IF NOT EXISTS newTable LIKE oldTable;
-- 通过已有表和数据造表并插入空值
CREATE TABLE IF NOT EXISTS newTable AS SELECT key, features FROM oldTable WHERE y='2019';
从已有表生成create-table-as-select(CTAS)
- CTAS创建的表是原子性的,这意味着,只有当该表所有查询完成后,用户才能看到完整的结果表。
- CTAS唯一的限制是目标表:即目标表不能是有个有分区的表,也不能是外部表。
三种内置存储格式
TEXTFILE
是最普通的文件存储格式,内容可以直接查看。SEQUENCENFILE
基于行存储,将表中的数据以二进制格式编码,并且支持数据压缩,可以节省存储空间。RCFILE
基于列存储,将表中的数据以二进制格式编码,并且支持压缩。 因为列值重复多,所以压缩效率高。占用磁盘存储空间小,io压力小。对包含大量字段的表而言,更适合RCFILE存储。
4.1.2 drop
删除
常见删表操作
1
DROP TABLE IF EXISTS testTable;
被 drop
或者 insert overwrite
的表会暂时被移动到HDFS的 hdfs://namenode/tmp/username/taskid/
路径下以供恢复。
4.1.3 alter
更改
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- 增加分区
ALTER TABLE testTable ADD PARTITION (y='2017') PARTITION (y='2016');
-- 增加分区,多个分区需要指定前后顺序
ALTER TABLE testTable ADD PARTITION (y='2017', m='03') location '/y=2017/m=03';
-- 删除分区
ALTER TABLE testTable DROP PARTITION (y='2017') PARTITION (y='2016');
-- 表重命名
ALTER TABLE old_table RENAME TO new_table;
-- 添加列
ALTER TABLE testTable ADD COLUMNS (extra_col1 int COMMENT 'New Col1',
extra_col2 int COMMENT 'New Col2');
-- 替换全部列
ALTER TABLE testTable REPLACE COLUMNS (new_col1 int COMMENT 'New Col1',
new_col2 int COMMENT 'New Col2');
4.1.4 load
装载数据
load
从外部文件载入数据并转化为Hive的分布式数据文件。
1
2
3
4
5
6
7
8
-- 往无分区表装载并覆盖
LOAD DATA INPATH 'path/data.csv' OVERWRITE INTO TABLE test_table;
-- 往指定分区装载并覆盖
LOAD DATA INPATH 'path/data.csv' OVERWRITE INTO TABLE test_table PARTITION (y='2019');
-- 往指定分区装载并追加
LOAD DATA INPATH 'path/data.csv' INTO TABLE test_table PARTITION (y='2019');
4.2 DML 数据操作语言
4.2.1 select
查询
1
2
3
4
5
6
7
8
9
10
11
-- 例子:统计不同年龄/性别组的人数
SELECT b.age, c.sex, count(*) FROM
(SELECT id FROM table_1) a
JOIN
(SELECT id, employee_id, age FROM table_2) b
ON a.id = b.id
JOIN
(SELECT employee_id, sex FROM table_3) c
ON b.employee_id=c.employee_id
GROUP BY b.age, c.sex
ORDER BY b.age, c.sex DESC;
select
嵌套查询会影响性能,有时候可以提速,有时候会降速,取决于mapper-reducer的分配机制。
查询关键词
-
distinct
去重distinct
对结果集去重,一般会启用MapReduce并行去重。distinct
一般会对全部选择字段去重,并不能针对其中部分字段去重。count distinct
去重统计会将reducer的数量强制限制为1,因而应该写为子查询提高效率。1 2 3 4 5 6
-- 罗列不同性别和年龄的组合 SELECT DISTINCT sex, age FROM test_table; -- 使用子查询优化,罗列不同性别和年龄的组合 SELECT COUNT(*) FROM (SELECT DISTINCT sex, age FROM test_table) tb;
-
group by
分组group by
条件可以使字段也可以是函数结果,会将数据分组再在每组内分别执行聚合函数。group by
依赖的字段必须是已有的或者在子查询内的,不能是在同一层查询的字段别名。group by
不能和distinc
同时使用。1 2 3 4 5
-- 按性别、年龄分组并统计人数 SELECT sex, age COUNT(*) FROM test_table GROUP BY sex, age; -- 随机分组并统计平均年龄 SELECT AVG(age) FROM test_table GROUP BY CAST(rand()*10 AS int);
-
order by
与sort by
排序order by
会对输出做全局排序,因此会触发一个reducer,当输出规模较大是,需要较长计算时间和存储空间。sort by
不是全局排序,其在数据进入reducer之前完成排序。可以通过set mapred.reduce.tasks=<numer>
设置reducer个数。结果集在单个reducer内有序但对全局未必有序,所以常辅以order by
优化性能。1 2 3 4 5 6 7 8
-- 按年龄全局倒序排序曲最年迈的10个人 SELECT id, age FROM test_table GROUP BY age DESC LIMIT 10; -- 通过sort by预筛选而优化order by性能,同时避免数据过大而无法order by SET mapred.reduce.tasks=10; SELECT id, age FROM SELECT id, age FROM test_table SORT BY age DESC LIMIT 10*10) t ORDER BY age DESC LIMIT 10;
-
limit
限制limit
结果集的顺序是随机的,和存储文件的顺序有关。limit
的执行顺序亚于order by
。
4.2.2 insert
插入
insert
的数据源是Hive内已经存在的表。
insert into
是在文件末尾追加数据,而insert overwrite
是覆盖全部数据文件。
讲一个表的数据拆分到多个分区需要注意合理使用insert
语法,避免多次扫描全表。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- 增加数据
INSERT INTO TABLE test_table SELECT * FROM source_table;
-- 覆盖原分区的数据
INSERT OVERWRITE TABLE test_table
PARTITION (y='2017')
SELECT * FROM source_table;
-- 一次扫描多路输出
FROM source_table st
INSERT OVERWRITE TABLE target_table
PARTITION (y='2017' AND m='03')
SELECT * WHERE st.y='2017' AND st.m='03'
INSERT OVERWRITE TABLE target_table
PARTITION (y='2017' AND m='04')
SELECT * WHERE st.y='2017' AND st.m='04'
INSERT OVERWRITE TABLE target_table
PARTITION (y='2017' AND m='05')
SELECT * WHERE st.y='2017' AND st.m='05';
4.2.3 join
连接
性能优化
- Hive的
join
只支持但条件或多条件的等值=
关联,不支持or
关联。 join
的结果集不能有重名字段。- reducer 会缓存
join
序列中除了最后一个表的所有表的记录,再通过最后一个表将结果序列化到文件系统,因此实践中应该把最大的那个表写在最后,否则会因为缓存浪费大量内存。 - 如果两张大宽表都有大量字段,但结果集仅保留少量字段,则建议用子查询优先筛选字段,避免占用太多缓存甚至是OOM任务失败。
1
2
3
4
5
6
7
8
9
-- 通过子查询过滤字段
SELECT a.id, b.age, c.sex FROM
(SELECT id FROM table_1) a
JOIN
(SELECT id,employee_id,age FROM table_2) b
ON a.id = b.id
JOIN
(SELECT employee_id, sex FROM table_3) c
ON b.employee_id = c.employee_id;
- 如果所有表之间没有公共关联键,则会触发多个MR stage依次执行,如果所有表之间存在同一公共关联键,则用最大表为左表可以让MR stage并行执行。
1
2
3
4
5
6
7
8
9
-- Parallel Join
SELECT a.id, b.age, c.sex FROM
(SELECT id FROM table_1) a
JOIN
(SELECT id,age FROM table_2) b
ON a.id = b.id
JOIN
(SELECT id, sex FROM table_3) c
ON a.id = c.id;
join
之前需要确保关联键是否去重,是不是刻意保留非去重结果,要警惕数据倾斜的发生。
数据倾斜
如果两张 10^8^ 数量级的表未对关联键去重,且每张表有10%的数据的关联键为空,则此部分脏数据就会形成10^14^ 数据量的文件,而导致Reducer阶段始终卡在99%。
1
2
3
4
5
6
-- 对关联键进行非NULL非空过滤,避免数据倾斜
SELECT a.id, a.age, b.sex FROM
(SELECT distinct id, age FROM table_1 WHERE id IS NOT NULL AND id <> '') a
JOIN
(SELECT distinct id, age FROM table_2 WHERE id IS NOT NULL AND id <> '') b
ON a.id = b.id;
3种类型的 join
join
: Hive 的join
默认是inner join
, 找出左右都可匹配的记录。left join
: 左连接,以左边表为准,逐条去右边表找可匹配的字段,如果有多条会依次列出。full outer join
: 包含两个表的连接结果,左表缺失或右表缺失的数据会填充NULL。
1
2
3
4
5
6
-- 左右两表连接取并集的sql模板
SELECT nvl(a.id, b.id) AS id, a.age, b.sex FROM
(SELECT id, age FROM table1) a
FULL OUTER JOIN
(SELECT id, sex FROM table2) b
ON a.id = b.id;
4.2.4 union
合并
union
与 union all
均基于列合并多张表的数据,所合并的列格式必须完全一致。
union
过程中会去重并降低效率,而 union all
直接追加数据
union
前后是两段 select
语句而非结果集
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-- 不去重合并两张表的数据
CREATE TABLE IF NOT EXISTS union_table as
SELECT * FROM
(
SELECT * FROM table_1
UNION ALL
SELECT * FROM table_2
)t;
-- 针对两个限制行数的子表进行union需要额外嵌套select
CREATE TABLE IF NOT EXISTS union_table as
SELECT * FROM
(
SELECT * FROM (SELECT * FROM table_1 LIMIT 100) tb1
UNION ALL
SELECT * FROM (SELECT * FROM table_2 LIMIT 200) tb2
)
4.2.5 导出数据
导出数据可以通过命令行模式或者交互模式实现
1
2
3
4
5
6
-- 交互模式
INSERT OVERWRITE LOCAL DIRECTORY 'path/data.csv'
SELECT * FROM test_table LIMIT 100;
-- 命令行模式
hive -v -e "SELECT * FROM test_table LIMIT 100;" >path/data.csv
(*)命令行模式中,会出现secureCRT中hive的显示错误(缺少空格,tab,数据黏连,数据缺少等),交互模式可以避免该问题。
4.3 常用函数汇总
4.3.1 !=
/<>
/=
比较
针对string类型的比较适合用 <>
不等与 =
相等, 针对数值类型的比较适合用 !=
与 =
。
判断是否是NULL适合直接用 is null
和 is not null
1
2
3
4
5
-- 过滤脏姓名数据
SELECT * FROM table1 WHERE name <> '' AND name IS NOT NULL;
-- 过滤脏年龄数据
SELECT * FROM table2 WHERE age > 0.0 AND age IS NOT NULL;
4.3.2 字符串函数
基本函数
concat(str1, str2)
进行字符串合并。length(str)
返回字符串长度。substr(str,0,12)
截取字符串从0位开始的12个字符。
匹配与正则函数
like
做普通文本匹配,rlike
做正则匹配,且在HIve2上不在支持针对int类型的匹配。regexp_replace
做正则替换,regexp_extract
做正则匹配。
1
2
3
4
5
6
-- 获取基金有关数据
SELECT * FROM test_table WHERE product_type LIKE "%基金%";
-- 将"2017-04-01 00:00:00"格式的时间先提取年月日再去除"-"符号,最后转为int
-- 0对应完整匹配结果,1/2/3则对应第1/2/3和()内的匹配结果
SELECT CAST(regexp_replace(regexp_extract(reg_time,"([0-9]+)-([0-9]+)-([0-9]+)",0),"-","") AS int) FROM test_table;
4.3.3 count
/sum
/avg
/max
/min
聚合函数
基本使用
聚合函数在同一层查询中可以组合使用,但不可能互相嵌套使用。
如果需要实现诸如max(avg(feature))的效果,则需要使用嵌套的 select
。
1
2
3
4
5
6
7
8
-- 简单统计年龄分布
SELECT COUNT(*), AVG(age) as avg_age, MAX(age) as max_age FROM test_table;
-- 统计不同性别的年龄分布
SELECT sex, COUNT(*), AVG(age) as avg_age, MAX(age) as max_age FROM test_table GROUP BY sex;
-- 结合case when使用统计有效数据量
SELECT COUNT(CASE WHEN income IS NOT NULL 1 ELSE NULL END) FROM income_record;
count
与 sum
的区别
sum
针对所有数值求和, count
将所有非NULL值记为1后统计。
1
2
-- 针对Value字段的有值率统计
SELECT COUNT(IF(value>0.0,1,NULL))/COUNT(*) FROM test_table;
4.3.4 case
条件函数
case
函数通常格式为 (case when condition1 then value1 else null end)
, 其中else
默认条件可省,end
关键词必不可省。
case when
中不能包含聚合函数,否则会报错。
1
2
3
4
5
6
7
8
9
10
11
12
-- 收入区间分组
SELECT id,
(CASE
WHEN CAST(income as float) < 50000 THEN '0~5万'
WHEN CAST(income as float) >= 50000 AND CAST(income as float) < 1000000 THEN '5~10万'
WHEN CAST(income as float) >= 100000 AND CAST(income as float) < 1500000 THEN '10~15万'
WHEN CAST(income as float) >= 150000 AND CAST(income as float) < 2000000 THEN '15~20万'
WHEN CAST(income as float) >= 200000 AND CAST(income as float) < 2500000 THEN '20~25万'
WHEN CAST(income as float) >= 250000 AND CAST(income as float) < 3000000 THEN '25~30万'
WHEN CAST(income as float) > 300000 THEN '30万以上'
ELSE NULL END)
FROM test_table;
4.3.5 if
条件函数
if
函数是简化版的 case
函数,且可以通过嵌套实现多分类。
1
2
-- 将年龄分为四段
SELECT id, IF(age>50, IF(age>75,"晚年","中老年"),IF(age>25,"青壮年","青少年")) FROM test_table;
4.3.6 in
范围函数
HIve不支持在 in
函数嵌套 select
语句。
1
2
-- 提取已知性别的数据
SELECT * FROM test_table WHERE sex IN("M","F");
4.3.7 where
过滤条件
where
过滤条件针对 group by
前的结果集筛选, 优先级高于 group by
1
2
3
4
-- 统计不同公司的男性平均年龄
SELECT company, AVG(age) FROM test_table
WHERE sex = "M"
GROUP BY company;
4.3.8 having
过滤条件
having
过滤条件针对 group by
后的结果集筛选,优先级低于 group by
1
2
3
4
5
-- 统计不同公司的男性平均年龄,且仅保留平均年龄在50岁以上的公司
SELECT company, AVG(age) FROM test_table
WHERE sex = "M"
GROUP BY company
HAVING AVGA(age) > 50;
4.3.9 nvl
空值填充函数
NVL(string1, replace_with)
用法,如果string1为NULL,则nvl
函数返回replace_with的值,否则返回string1的值。
1
2
-- 无收入的客户默认值为0
SELECT customer_id, NVL(salary,0) AS salary FROM test_table;
4.3.10 cast
类型转换
cast as
常用于string/int/double之间的类型转换
1
2
-- 把string类型保存的数字还原为int
SELECT CAST(value_string as INT) FROM test_table;
4.3.11 row_number
编号函数
1
2
3
4
5
-- 按照c2字段倒叙编号
SELECT *, row_number() OVER(ORDER BY c2 DESC) AS row_num FROM test_table;
-- 按照c1字段分组后再按c2字段倒叙编号
SELECT *, row_number() OVER(PARTITION BY c1 ORDER BY c2 DESC) AS row_num FROM test_table;
4.3.12 percentile
百分位函数
如果只查询一个分位, 则 percentile
的返回结果是数值, 如果查询多个分位, 则结果是string类型的array.
如果不加 where
条件限定, 则 percentile
的计算过程包含0值, 往往会造成结果偏移.
1
2
3
4
5
-- 获取income字段的10个分位值
SELECT percentile(CAST(income AS int), ARRAY(0.0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1.0)) AS income_percentiles FROM test_table;
-- 获取income段字Top10%的阈值
SELECT percentile(CAST(income AS int),0.9)) AS income_top10p_threshold FROM test_table;
4.3.13 to_date
时间函数
to_date()
时间函数可以把时间字符串转换成时间类型, 再计算时间差
常用日期提取函数包括 year(dstr)
/month(dstr)
/day(dstr)
/hour(dstr)
/minute(dstr)
/second(dstr)
日期运算函数包括 datediff(enddate,startdate)
/date_sub(startdate,days)
/date_add(startdate,days)
1
2
3
4
5
-- 转换时间数据的格式
SELECT to_date("1970-01-01 00:00")SELECT to_date("1970-01-01 00:00:00") AS start_time FROM test_table;
-- 计算所有数据到当前时间的天数差
SELECT datediff(to_date(from_unixtime(unix_timestamp())),record_date) from test_table;
4.3.14 split
字符串分隔函数
split(str,regex)
函数用于将string类型数据按regex提取, 分隔后转换为array.
数据拆分为多个列.
1
2
3
4
5
6
-- 以","为分隔符分隔字符串, 并转化为array
SELECT split("1,2,3", ",") AS value_array FROM test_table;
-- 结合array index, 将原始字符串分割成3列
SELECT value_array[0] AS value1, value_array[1] AS value2, value_array[2] AS value3 FROM
(SELECT split("1,2,3", ",") AS value_array FROM test_table)t;
4.3.15 explode
转置函数
explode
转置单列使用时可以把单列数据拆分成多行.
explode
转置单列并保留其他列时需要结合 lateral view
使用.
1
2
3
4
5
6
-- array(1,2,3)变成了3列
SELECT explode(ARRAY(1,2,3)) FROM test_table;
-- 保留主键列, 并将存储array(1,2,3)的array_col列转置
SELECT id, exploded_col FROM source_table
LATERAL VIEW explode(array_col) view_alias AS exploded_col;
4.3.16 with cube
分析功能
with cube
功能实现了笛卡尔积运算
如果对(a,b,c)字段组进行 group by with cube
, 则相当于 (a,b,c),(a,b),(a,c),(b,c),(a),(b),(c),()
这8种情况分别聚合统计.
1
2
3
-- 统计a,b,c这仨个字段在所有组合下的记录数
SELECT a,b,c,COUNT(*) FROM test_table
GROUP BY a,b,c WITH CUBE;
5. 其他操作
5.1 数据迁移
-
在新路径下创建一张相同结构的表,字段结构、
serde format
和stored as
类型必须完全一致create table migrated_table( id string, number int, key_double double, key_string string ) row format delimited fields terminated by '\t';
这种方式一般属于其他操作无法正常使用下的方法,当表过于复杂容易出错。
-
使用
distcp
指令进行迁移1 2 3
# -skipcrccheck 规避跨Hadoop版本迁移的数据检验 # hftp方式适用于跨集群迁移,hdfs适合集群内迁移 $ hadoop distcp -Dmapred.job.name=queueName -update -skipcrccheck hftp://domain:ip/user/sourceDestination user/targetDestination
-
如果迁移的是分区表,则需要预先用
alter table
指令造好分区文件夹,如果分区过多,则可以用distcp
分批复制分区数据
5.2 数据恢复
首先要明白几点,被 truncate
的数据是无法恢复的,但是被 drop
或者 insert overwrite
的表第一时间是可以恢复的。
恢复的具体操作如下:
-
建立一张相同格式的表
newTable
, 存储格式和字段类型必须完全一致。 -
从
tmp
目录中把数据复制出来:1
$ hadoop fs -cp /tmp/username/stageing_hive_YYYY-MM-DD_id/* path/databaseName.db/newTable/*
5.3 查询表更新情况
-
针对分区表可以直接查询最新分区来判断数据是否正确更新。
-
针对无分区表,
desc formatted
的结果往往不准确,所以要用Hadoop的指令去查询数据文件的更新时间。1
$ hadoop fs -ls path/databaseName.db/tableName/