Hive

Hive的使用

Hive QL

Hive 的 sql “方言”, 称为 HiveQL。

SQL 和 HiveQL 的概要比较

特性 SQL HiveQL
更新 UPDATA INSERT
事务 支持 支持(表级和分区级)
索引 支持 支持
延迟 亚秒级 分钟级
数据类型 整数、浮点数、定点数、文本和二进制串、时间 整数、浮点数、布尔值、文本和二进制串、时间戳、数组、映射、结构
函数 数百个内置函数 几十个内置函数
多表插入 不支持 支持
Create table as select 一部分支持 支持
选择 SQL-92 FROM子句中只能有一个表或者视图。支持偏序的SORT BY。 可限制返回行数量的LIMIT
子查询 在任何字句中支持相关的或者不相关的 只能在FROM子句中(不支持相关子查询)
视图 可更新 只读
拓展点 用户自定义函数、存储过程 用户自定义函数、MapReduce脚本

数据类型

类别 类型 描述 文字示例
基本数据类型 TINYINT 1B有符号整数,-128~127 1
基本数据类型 SMALLINT 2B有符号整数,-32768~32767 1
基本数据类型 INT 4B有符号整数,-2147483648~2147483647 1
基本数据类型 BIGINT 8B有符号整数,-9223372036854775808~9223372036854775807 1
基本数据类型 FLOAT 4B单精度浮点数 1.0
基本数据类型 DOUBLE 8B双精度浮点数 1.0
基本数据类型 BOOLEAN true/false true
基本数据类型 STRING 字符串 ‘a’,”a”
基本数据类型 BINARY 字节数组
基本数据类型 TIMESTAMP 精度到纳秒的时间戳 1325502245000,’2012-01-02 03:04:05.123456789’
复杂数据类型 ARRAY 一组有序字段。字段的类型必须相同 array(1,2)
复杂数据类型 MAP 一组无序的键值对。见得类型必须是原子的;值可以是任何类型。同一个映射的键的类型必须相同,值的类型也必须相同 map(‘a’,1,’b’,2)
复杂数据类型 STRUCT 一组命名的字段,字段的类型可以不同 struct(‘a’,1,1.0)
  1. array(), map(), struct()三个函数都是HIVE的内置函数
  2. 列命名col1, col2, col3等

复杂类型

1
2
3
4
5
CREATE TABLE complex (
col1 ARRAY<INT>,
col2 MAP<STRING, INT>,
col3 STRUCT<a:STRING, b:INT, c:DOUBLE>
);
1
2
hive> SELECT col1[0], col2['b'], col3.c FROM complex;
1 2 1.0

操作与函数

函数分为几大类(在hive shell中输入SHOW FUNCTION可以得到):

  • 数学和统计函数
  • 字符串函数
  • 日期函数
  • 条件函数
  • 聚集函数
  • 处理XML和JSON函数
1
2
3
-- 获取某个函数帮助
hive> DESCRIBE FUNCTION length;
length(str|binary) - Returns the length of str or number of bytes in binary data

类型转换

隐式转换:
所有整数、float、string 都能转换成double
timestamp能转换为String
显示转换(cast):
CASE(‘1’ AS INT) 将字符串1 转为 1
CASE(‘x’ AS INT) 强转失败,返回 NULL

hive 的表在逻辑上由存储的数据和描述表中的数据形式的相关元数据组成。
数据一般存放在HDFS上或者其他Hadoop文件系统中。
元数据存放在关系数据库中,比如mySql。

托管表和外部表

托管表:

1
2
CREATE TABLE managed_table (dummy STRING);
LOAD DATA INPATH '/user/tom/data.txt' INTO table managed_table;

把文件 hdfs://user/tom/data.txt 移动到 managed_table的仓库目录中,即hdfs://user/hive/warehouse/managed_table。

1
2
--删除
DROP TABLE managed_table;

外部表

1
2
3
CREATE EXTERNAL TABLE external_table (dummy STRING)
LOCATION '/user/tom/external_table';
LOAD DATA INPATH 'user/tom/data.txt' INTO TABLE external_table;

hive 不会移动数据到数据仓库,也不会检查目录是否存在,删除表时也只会删除元数据。

分区和桶

分区是在创建表的时候用PARTIONED BY 子句定义的。

1
2
CREATE TABLE logs (ts BIGINT, line STRING)
PARTITION BY (dt STRING, country STRING);

加载数据时要指定分区

1
2
3
LOAD DATA LOCAL INPATH 'input/hive/partitions/file1'
INTO TABLE logs
PARTITION (dt='2017-01-01', country='GB');

显示分区

1
hive> SHOW PARTITIONS logs;

按分区查询

1
2
3
SELECT ts, dt, line
FROM logs
WHERE country='GB';

1
2
CREATE TABLE bucketed_users (id INT, name STRING)
CLUSTERED BY (id) INTO 4 BUCKETS;

对id进行hash取模,放到4个桶中

1
2
3
--对数据取样
hive> SELECT * FROM bucketed_users
> TABLESAMPLE(BUCKET 1 OUT OF 4 ON id);

这样大约得到1/4的数据

存储格式

HIVE从两个维度对表的存储进行管理:

  • 行格式 SerDe(序列化和反序列化工具)
  • 文件格式
    默认行内分隔符 ^A
    集合元素的默认分隔符 ^B, 用于分割ARRAY STRUCT 或者 MAP的键值对中的元素

CREATE TABLE … 默认为:

1
2
3
4
5
6
7
CREATE TABLE ...
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;

二进制存储格式: 顺序文件、 Avro数据文件以及RCFile

导入数据

INSERT 语句

1
2
3
INSERT OVERWRITE TABLE target
SELECT col1,col2
FROM source;

对于分区的表,可以使用PARTITION子句来指明数据要插入哪个分区

1
2
3
4
INSERT OVERWRITE TABLE target
PARTITION (dt='2017-01-01')
SELECT col1,col2
FROM source;

多表插入

1
2
3
4
5
6
7
8
9
10
11
12
FROM records2
INSERT OVERWRITE TABLE stations_by_year
SELECT year, COUNT(DISTINCT station)
GROUP BY year
INSERT OVERWRITE TABLE records_by_year
SELECT year, COUNT(1)
GROUP BY year
INSERT OVERWRITE TABLE good_records_by_year
SELECT year,COUNT(1)
WHERE temperature != 9999
And quality = 0 OR quality = 1
GROUP BY year;

CTAS

CTAS 的操作是原子的

1
2
3
4
CREATE TABLE target
AS
SELECT col1, col2
FROM source;

表的修改

重命名

1
ALTER TABLE source RENAME TO target;

添加一个新的列

1
ALTER TABLE target ADD COLUMNS (col3 STRING);

表的丢弃

  • DROP TABLE 删除表的数据和元数据
  • dfs -rmr /user/hive/warehouse/my_table 删除数据
  • CREATE TABLE new_table LIKE existing_table; 创建一个与第一个表模式相同的新表

查询数据

使用SELECT语句的各种形式从Hive中检索数据

排序和聚集

在Hive中可以使用ORDER BY 子句对数据进行排序,但是ORDER BY 只通过一个reducer来做到这点。
SORT BY为每个reducer产生一个排序文件。
DISTRIBUTE BY 控制特定行应该到那个reducer

1
2
3
4
5
6
7
8
9
hive> FROM records2
> SELECT year, temperature
> DISTRIBUTE BY year
> SORT BY year ASC, temperature DESC;
1949 111
1949 78
1950 22
1950 0
1950 -11

MapReduce脚本

1
2
3
4
5
6
7
8
9
10
hive> ADD FILE /user/tom/book_workspace/scr.py;
hive> FROM records2
> SELECT TRANSFORM(year, temperature, quality)
> USING 'scr.py'
> AS year, temperature;
1950 0
1950 22
1950 -11
1949 111
1949 78

连接

内连接

1
2
hive> SELECT sales.*, things.*
> FROM sales JOIN things ON (sales.id = things.id);

外连接

left outer join

1
2
hive> SELECT sales.*, things.*
> FROM sales LEFT OUTER JOIN things ON (sales.id = things.id);

除此之外还有一种”全外连接”(full outer join) “右外连接”(right outer join)

半连接

1
2
hive> SELECT *
> FROM things LEFT SEMI JOIN sales ON (sales.id = things.id);

右表只能在ON子句中出现

map连接

1
2
hive> SELECT /*+ MAPJOIN(things) */ sales.*, things.*
> FROM sales JOIN things ON (sales.id = things.id);

如果用map进行桶连接,设置
SET hive.optimize.bucketmapjoin=true;

子查询

hive 的子查询只能出现from子句中

视图

1
2
3
4
5
6
CREATE VIEW v_records
AS
SELECT *
FROM records2
WHERE temperature != 9999
AND (quality = 0 OR quality = 1);

创建视图并不执行查询,只有用到的时候才会执行。

用户自定义函数

UDF分为三种

  • 普通UDF 作用于单个数据行,产生一个数据行
  • UDAF 接受多个输入数据行,产生一个数据行
  • UDTF 作用于单个数据行,产生多个数据行

写UDF

一个UDF必须:

  1. 是org.apache.hadoop.hive.ql.exec.UDF的子类
  2. 至少实现一个evaluate()方法

返回String类型时最好使用可以对象重用的TEXT类

写好后打包
ADD JAR /path/to/hive-examples.jar;

起个别名
CREATE TEMPORARY FUNCTION function_name AS ‘com.hadoopbook.hive.function_name’;

只对临时hive会话有效

写UDAF

UDAF

一个计算函数必须实现下面5个方法

  • init() 负责初始化计算函数并重设它的内部状态
  • interate() 每次对一个新值进行聚集计算都会调用iterate()方法
  • terminatePartial() Hive 需要部分聚集结果是会调用 terminatePartial()方法
  • merge() 在Hive决定要合并一个部分聚集值和另一个部分聚集值时会调用merge()方法
  • terminate() Hive需要最终聚集结果时会调用terminate()方法

更加复杂的返回类型可以定义一个数据类来实现。