今回は Apache Hive の Partition 機能を使ってみる。 Partition 機能を用いない場合、クエリを発行するとテーブルを構成するファイル群にフルスキャンがかかる。 それに対し、Partition 機能を用いるとクエリによってはスキャンするファイルの範囲を制限できる。 結果としてパフォーマンスの向上が見込める場合がある。
使った環境は次の通り。 Apache Hive や Hadoop のインストール部分については省略する。
$ cat /etc/redhat-release CentOS Linux release 7.4.1708 (Core) $ uname -r 3.10.0-693.5.2.el7.x86_64 $ hadoop version Hadoop 2.8.3 Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r b3fe56402d908019d99af1f1f4fc65cb1d1436a2 Compiled by jdu on 2017-12-05T03:43Z Compiled with protoc 2.5.0 From source with checksum 9ff4856d824e983fa510d3f843e3f19d This command was run using /home/vagrant/hadoop-2.8.3/share/hadoop/common/hadoop-common-2.8.3.jar $ hive --version Hive 2.3.2 Git git://stakiar-MBP.local/Users/stakiar/Desktop/scratch-space/apache-hive -r 857a9fd8ad725a53bd95c1b2d6612f9b1155f44d Compiled by stakiar on Thu Nov 9 09:11:39 PST 2017 From source with checksum dc38920061a4eb32c4d15ebd5429ac8a
下準備として Hive の CLI を起動しておく。
$ hive hive> set hive.cli.print.header=true;
Partition 機能を使わない場合
まずは Partition 機能を使わない場合について見ておこう。
例としてユーザがログインしたときのイベントを記録するテーブルを用意する。
hive> CREATE TABLE login_events ( > datetime TIMESTAMP, > name STRING > ); OK Time taken: 0.052 seconds
上記のテーブルにレコードを追加してみよう。
hive> INSERT INTO login_events > VALUES ("2018-01-01 10:00:00", "Alice"); ... OK _col0 _col1 Time taken: 22.584 seconds
一件のレコードがテーブルに保存された。
hive> SELECT * FROM login_events; OK login_events.datetime login_events.name 2018-01-01 10:00:00 Alice Time taken: 0.148 seconds, Fetched: 1 row(s)
このとき、実際にテーブルのデータが格納される場所は SHOW CREATE TABLE
を使って確認できる。
具体的には LOCATION
のところ。
hive> SHOW CREATE TABLE login_events; OK createtab_stmt CREATE TABLE `login_events`( `datetime` timestamp, `name` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 'hdfs://master:9000/user/hive/warehouse/login_events' TBLPROPERTIES ( 'transient_lastDdlTime'='1514916341') Time taken: 0.026 seconds, Fetched: 13 row(s)
今回であれば HDFS 上の /user/hive/warehouse/login_events
というパスに格納されていることが分かる。
hdfs
コマンドを使って上記のディレクトリを確認してみよう。
たしかに、先ほど追加したデータが書き込まれたテキストファイルがある。
$ hdfs dfs -ls /user/hive/warehouse/login_events Found 1 items -rwxrwxr-x 2 vagrant supergroup 26 2018-01-02 18:26 /user/hive/warehouse/login_events/000000_0 $ hdfs dfs -cat /user/hive/warehouse/login_events/000000_0 2018-01-01 10:00:00Alice
さて、上記のテーブルは Partition 機能を使っていない。 そのため、以下のような時刻を制限するようなクエリを実行したときも、ファイル群のフルスキャンが必要になってしまう。
hive> SELECT * > FROM login_events > WHERE datetime > "2018-01-01 00:00:00"; OK login_events.datetime login_events.name 2018-01-01 10:00:00 Alice Time taken: 0.169 seconds, Fetched: 1 row(s)
フルスキャンが走るとデータが増えたときに実行時間がかかるので、それを避けたいというのが今回の本題になる。
一旦、上記のテーブルを削除しておこう。
hive> DROP TABLE login_events; OK Time taken: 0.143 seconds
Table-by-Day アンチパターン
Partition 機能の説明に入る前に Table-by-Day というアンチパターンについて見ておこう。 これは、フルスキャンに時間のかかるテーブルに対処する方法として使われることのあるやり方だ。 具体的には、日付ごとの単位などでテーブルを分割して作ることでスキャンの範囲を狭めようというもの。
hive> CREATE TABLE login_events_20180101 ( > datetime TIMESTAMP, > name STRING > ); OK Time taken: 0.107 seconds hive> CREATE TABLE login_events_20180102 ( > datetime TIMESTAMP, > name STRING > ); OK Time taken: 0.046 seconds
もちろんデータはテーブルに応じて格納することになる。
hive> INSERT INTO login_events_20180101 > VALUES ("2018-01-01 10:00:00", "Alice"); ... OK _col0 _col1 Time taken: 23.413 seconds hive> INSERT INTO login_events_20180102 > VALUES ("2018-01-02 20:00:00", "Bob"); ... OK _col0 _col1 Time taken: 25.033 seconds
ただ、このやり方を取ると日付をまたいだ集計をするときなんかにクエリが複雑化してイマイチな感じ。
hive> SELECT * FROM login_events_20180101 > UNION ALL > SELECT * FROM login_events_20180102; ... _u1.datetime _u1.name 2018-01-01 10:00:00 Alice 2018-01-02 20:00:00 Bob Time taken: 26.36 seconds, Fetched: 2 row(s)
Apache Hive では Table-by-Day よりも Partition 機能を使うことが推奨されている。
Partition 機能を使った場合
続いて Partition 機能を使ってみる。
今度はテーブルを作るときに PARTITIONED BY
で年月日 (year, month, day) を元にパーティションを構成する。
hive> CREATE TABLE login_events ( > datetime TIMESTAMP, > name STRING > ) > PARTITIONED BY (year INT, month INT, day INT); OK Time taken: 0.049 seconds
datetime
を直接使えないの?という疑問が浮かぶと思うけど残念ながら難しそう。
レコードを追加するときは、次のようにカラムの値と一緒にパーティションの値も指定することになる。
hive> INSERT INTO login_events > PARTITION (year=2017, month=1, day=1) > VALUES ("2018-01-01 10:00:00", "Alice"); ... _col0 _col1 Time taken: 24.017 second
さて、それでは先ほどと同じように HDFS 上でデータがどのように格納されているかを確認してみよう。
hive> SHOW CREATE TABLE login_events; OK createtab_stmt CREATE TABLE `login_events`( `datetime` timestamp, `name` string) PARTITIONED BY ( `year` int, `month` int, `day` int) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 'hdfs://master:9000/user/hive/warehouse/login_events' TBLPROPERTIES ( 'transient_lastDdlTime'='1514917075') Time taken: 0.048 seconds, Fetched: 17 row(s)
格納されているディレクトリを見てみると、パーティションの値に応じてディレクトリが作られていることが分かる。
$ hdfs dfs -ls /user/hive/warehouse/login_events Found 1 items drwxrwxr-x - vagrant supergroup 0 2018-01-02 18:18 /user/hive/warehouse/login_events/year=2017 $ hdfs dfs -find /user/hive/warehouse/login_events /user/hive/warehouse/login_events /user/hive/warehouse/login_events/year=2017 /user/hive/warehouse/login_events/year=2017/month=1 /user/hive/warehouse/login_events/year=2017/month=1/day=1 /user/hive/warehouse/login_events/year=2017/month=1/day=1/000000_0
そう、つまり Partition 機能というのは格納されるディレクトリを分割することで実現されている。
新しくレコードを追加したときも確認してみよう。 今度はパーティションの値が先ほどとは異なる。
hive> INSERT INTO login_events > PARTITION (year=2017, month=1, day=2) > VALUES ("2018-01-02 20:00:00", "Bob"); ... OK _col0 _col1 Time taken: 20.819 seconds
同様に、新しくパーティションに対応するディレクトリが作成された。
$ hdfs dfs -find /user/hive/warehouse/login_events /user/hive/warehouse/login_events /user/hive/warehouse/login_events/year=2017 /user/hive/warehouse/login_events/year=2017/month=1 /user/hive/warehouse/login_events/year=2017/month=1/day=1 /user/hive/warehouse/login_events/year=2017/month=1/day=1/000000_0 /user/hive/warehouse/login_events/year=2017/month=1/day=2 /user/hive/warehouse/login_events/year=2017/month=1/day=2/000000_0
この状態でパーティションを使って条件式を組むとファイルのスキャン範囲がディレクトリ内に限定されるという寸法らしい。
hive> SELECT * > FROM login_events > WHERE year = 2017 AND month = 1 AND day = 1; OK login_events.datetime login_events.name login_events.year login_events.month login_events.day 2018-01-01 10:00:00 Alice 2017 1 1 Time taken: 0.389 seconds, Fetched: 1 row(s)
なるほど、それなら条件によってはクエリのパフォーマンスが向上するだろう、という感じ。
一旦、またテーブルを削除しておこう。
hive> DROP TABLE login_events; OK Time taken: 0.499 seconds
既存のテーブルにパーティションを追加する
続いては、既存の Partition 機能を使っていないテーブルにパーティションを追加したいというユースケースについて。 ようするに、最初は余裕だったけど時間とともにデータが増えてきてやばっ、というような場合。
まずは Partition 機能を使わない状態でテーブルを作ってレコードを追加しておこう。
hive> CREATE TABLE login_events ( > datetime TIMESTAMP, > name STRING > ); OK Time taken: 0.156 seconds hive> INSERT INTO login_events > VALUES > ("2018-01-01 10:00:00", "Alice"), > ("2018-01-02 20:00:00", "Bob"), > ("2018-01-03 06:00:00", "Carol"); OK _col0 _col1 Time taken: 22.689 seconds
あらかじめ、ここからの作業の流れについて概要を説明しておく。 残念なことに、既存のテーブルに直接パーティションのカラムを追加することはできない。 そこで、まずは既存のテーブルにパーティションを追加した形で新しいテーブルを用意しておく。 そして、その新しいテーブルに既存のテーブルのデータを移し替えることになる。
Partition に使う値は、元々あった datetime
カラムから生成することにしよう。
先ほどはパーティションを year > month > day と階層構造にしたけど、今度は一つにまとめてしまうことにする。
hive> SELECT date_format(datetime, "yyyy-MM-DD") FROM login_events; OK _c0 2018-01-01 2018-01-02 2018-01-03 Time taken: 0.253 seconds, Fetched: 3 row(s)
データを移行する先の、パーティションを追加した新しいテーブルを partitioned_login_events
という名前で用意する。
hive> CREATE TABLE partitioned_login_events ( > datetime TIMESTAMP, > name STRING > ) > PARTITIONED BY (day STRING); OK Time taken: 0.137 seconds
これから実施する作業は Dynamic Partition と呼ばれる機能を使うことになる。 そこで、あらかじめその機能を有効にしておこう。
hive> SET hive.exec.dynamic.partition=true; hive> SET hive.exec.dynamic.partition.mode=nonstrict;
Dynamic Partition 機能というのは、既存のテーブルに存在するカラムの値から自動的にパーティションを構成する機能をいう。 この機能を使わないと、自分で一つ一つパーティションを指定しながらデータの移行作業をする羽目になってつらい。
実際にデータの移行を実施するクエリは次の通り。
login_events
から partitioned_login_events
に対して INSERT OVERWRITE
でデータを移行している。
その際に SELECT
で作成した day
カラムをパーティションに指定しているのがポイント。
hive> FROM login_events > INSERT OVERWRITE TABLE partitioned_login_events > PARTITION(day) > SELECT datetime, name, date_format(datetime, "yyyy-MM-DD") AS day; ... OK datetime name day Time taken: 21.565 seconds
データがどのように格納されたかを確認してみよう。
hive> SHOW CREATE TABLE partitioned_login_events; OK createtab_stmt CREATE TABLE `partitioned_login_events`( `datetime` timestamp, `name` string) PARTITIONED BY ( `day` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 'hdfs://master:9000/user/hive/warehouse/partitioned_login_events' TBLPROPERTIES ( 'transient_lastDdlTime'='1514918693') Time taken: 0.02 seconds, Fetched: 15 row(s)
ちゃんと日付に応じてディレクトリが作られており Partition 機能が有効に働いていることが分かる。
$ hdfs dfs -find /user/hive/warehouse/partitioned_login_events /user/hive/warehouse/partitioned_login_events /user/hive/warehouse/partitioned_login_events/day=2018-01-01 /user/hive/warehouse/partitioned_login_events/day=2018-01-01/000000_0 /user/hive/warehouse/partitioned_login_events/day=2018-01-02 /user/hive/warehouse/partitioned_login_events/day=2018-01-02/000000_0 /user/hive/warehouse/partitioned_login_events/day=2018-01-03 /user/hive/warehouse/partitioned_login_events/day=2018-01-03/000000_0
あとは元々の名前にテーブルをリネームするなりご自由に。
まとめ
- Apache Hive は、そのままだとテーブルを構成するファイル群をフルスキャンする
- スキャン範囲を限定する方法には Table-by-Day というアンチパターンがある
- Apache Hive では代わりに Partition 機能を使うことが推奨されている
- Partition 機能はデータを格納するディレクトリを分割することで実現されている
- 作者: Edward Capriolo,Dean Wampler,Jason Rutherglen,佐藤直生,嶋内翔,Sky株式会社玉川竜司
- 出版社/メーカー: オライリージャパン
- 発売日: 2013/06/15
- メディア: 大型本
- この商品を含むブログ (3件) を見る