HdfsImporter 使用说明
最后更新于:2018-09-13 17:12:23
在使用前,请先阅读数据模型和数据格式的介绍。
1. HdfsImporter 概述
HdfsImporter 与 BatchImporter 一样用于将历史数据或外部数据导入神策分析。
2. 运行环境
HdfsImporter 仅可在神策分析集群版使用,且只能运行在神策分析集群的 Hadoop 环境中。
3. 经典使用方法
一次新启动的导入步骤如下:
- 将数据置于神策分析集群上的 HDFS 中。例如 HDFS 集群 /data/input 目录,其中包含文件data01、data02、data03等文件,文件内容为每行一个符合数据格式的 Json。
- 切换到 sa_cluster 账户:
sudo su - sa_cluster
- 请确定 HDFS 上数据不再进行修改(增删修改文件)后 ,运行 HdfsImporter
使用方式 :
sh $SENSORS_ANALYTICS_HOME/tools/hdfs_importer/bin/hdfs_importer.sh \
--path /data/input \
--project default
备注:
- 自 1.10 版本起,直接运行
jar
包的方式不再被支持。 - 自 1.13 版本起,可直接用简略的
hdfs_importer
命令替换sh $SENSORS_ANALYTICS_HOME/tools/hdfs_importer/bin/hdfs_importer.sh
- 导入后清理数据较复杂,请检查好再操作。对同一份数据多次运行导入会导致数据重复。
- path 可以是一个包含子目录的数据目录。
- HdfsImporter 执行成功之后,数据大约会在 1分钟之后 被查询到。
4. 配置项说明
列出所有配置项:
sh $SENSORS_ANALYTICS_HOME/tools/hdfs_importer/bin/hdfs_importer.sh --help
参数 | 必填 | 说明 | 默认值 |
---|---|---|---|
path | 是 | 需要导入的数据所在的 HDFS 路径。 | |
project | 否 | 期望导入的 project 名称,不指定的则导入到 default。 | default |
all_data_without_track_signup | 否 | 是否所有数据(包括之前各种类型导入)都不包含 track_signup 类型数据。若所有导入数据均不包含 track_signup 类型数据,添加这个选项可以提高导入速度。注意,使用过用户关联功能则不能使用该选项。 | |
split_max_size_mb | 否 | MapReduce 每个 Mapper 处理分片的最大值。该值会影响 Mapper 数量。 | 256 |
job_queue_name | 否 | MapReduce Job 的队列名。 | |
split_reduce_size_mb | 否 | MapReduce 每个 Reducer 处理的数据量大小。该值会影响 Reducer 的数据。Reducer 个数 = 输入的总数据量 / split_reduce_size_mb 。 | 2048 |
mapper_max_memory_size_mb | 否 | MapReduce 每个 Mapper 使用的内存最大值。 | 1024 |
reduce_max_memory_size_mb | 否 | MapReduce 每个 Reducer 使用的内存最大值。 | 2048 |
reduce_output_file_size_mb | 否 | MapReduce 每个 Reducer 输出的文件最大值。 | 256 |
shuffle_partition_split_size | 否 | Shuffer 时,每个 Mapper 切分 partition 的大小,一般用来调整数据分布不均匀 | 500000 |
expired_record_filter_after_hour | 否 | 允许导入的数据时间区间截至未来的小时数,默认为 1,即超过未来 1 小时的数据将被过滤。 | 1 |
expired_record_filter_before_hour | 否 | 允许导入的数据时间区间向前的小时数,默认为 17520,即超过 2 年以前的数据将被过滤。 | 17520 |
user_define_job_name | 否 | 允许自定义 job name,比如在 HdfsImpoter-12345-[your_name] 中,your_name 是自定义的部分 | 无 |
debug | 否 | 开始 debug 模式,数据只会以 JSON 文本的方式输出到相应的 HDFS 目录下,并且不会真正的导入到神策系统当中。 |
|
disable_import_succeeded_source_path | 否 | 禁止重复导入已成功过的导入路径 | false |
write_import_info | 否 | 在命令执行路径生成 import info 文件 | false |
一个复杂的例子:
sh $SENSORS_ANALYTICS_HOME/tools/hdfs_importer/bin/hdfs_importer.sh \
--path /data/input \
--project test_project \
--all_data_without_track_signup \
--split_max_size_mb 128 \
--job_queue_name data_import_queue \
--split_reduce_size_mb 2048 \
--reduce_max_memory_size_mb 2048 \
--reduce_output_file_size_mb 256 \
--disable_import_succeeded_source_path
5. 查询导入历史
自1.13 版本,支持查询 hdfs importer 导入历史, 查询结果以 JSON 格式输出。
使用方式 :
sh $SENSORS_ANALYTICS_HOME/tools/hdfs_importer/bin/hdfs_importer.sh list \
--project default
列出所有配置项 :
sh $SENSORS_ANALYTICS_HOME/tools/hdfs_importer/bin/hdfs_importer.sh list --help
参数 | 必填 | 说明 | 默认值 |
---|---|---|---|
project_name | 否 | project name, 可指定按照 project name 查询导入任务记录 | |
session_id | 否 | session id, 可指定按照 session id 查询导入任务记录 | |
job_name | 否 | job name,可指定按照 job_name 查询导入任务记录。这里的 job_name 为导入任务中的 user_define_job_name | |
status | 否 | hdfs importer 导入任务的状态,status 值可为 WAITING,RUNNING,SUCCESS,FAILED | |
start_time | 否 | start time, 可查询startTime比传入的指定时间晚的导入记录, 格式%Y-%m-%d %H:%M:%S | |
max_num | 否 | recent N times,可指定最近N次导入记录 | 10 |
dump | 否 | 查询结果输出到指定的FILENAME;如果没有指定,则会输出到控制台 | |
full | 否 | 指定后则查看所有任务,否则同一个session_id只查看最新的任务 | False |
例如,查询按小时的例行导入任务:
sh $SENSORS_ANALYTICS_HOME/tools/hdfs_importer/bin/hdfs_importer.sh list \
--project_name test_project \
--job_name hourly_import \
--status SUCCESS \
--start_time '2018-08-30 00:00:00' \
--max_num 2 \
--dump /data/work/list_output_file1
输出到文件中的结果为:
[
{
"id": 12,
"session_id": 320,
"job_name": "hourly_import",
"scheduler_job_record_id": null,
"import_path": "/sa/tmp/hdfsImport",
"parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}",
"start_time": "2018-09-11 18:46:50",
"end_time": "2018-09-11 18:49:46",
"counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0,\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}",
"log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-43c7d4ea-0b14-48f6-8b03-764178e927ae.log",
"event_job_id": "job_1536029076029_3074",
"profile_job_id": "job_1536029076029_3077",
"event_job_status": "SUCCESS",
"profile_job_status": "SUCCESS",
"event_data_load_status": "SUCCESS",
"project_name": "test_project"
},
{
"id": 10,
"session_id": 317,
"job_name": "hourly_import",
"scheduler_job_record_id": null,
"import_path": "/sa/tmp/hdfsImport",
"parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}",
"start_time": "2018-09-11 10:23:20",
"end_time": "2018-09-11 10:26:21",
"counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0,\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}",
"log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-67a00f94-67d8-415e-a004-c9ca82a17a2a.log",
"event_job_id": "job_1536029076029_3044",
"profile_job_id": null,
"event_job_status": "SUCCESS",
"profile_job_status": null,
"event_data_load_status": "SUCCESS",
"project_name": "test_project"
}
]
再如,查询指定 session_id 的所有导入任务状态:
sh $SENSORS_ANALYTICS_HOME/tools/hdfs_importer/bin/hdfs_importer.sh list \
--session_id 306 \
--full
输出到控制台的结果为
[
{
"id": 8,
"session_id": 306,
"job_name": "hourly_import",
"scheduler_job_record_id": null,
"import_path": "/sa/tmp/hdfsImport",
"parameters": "{\"session\":\"HdfsImport-306\"}",
"start_time": "2018-09-10 19:02:08",
"end_time": "2018-09-10 19:02:46",
"counter_info": "{\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}",
"log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-0634112a-6b90-40db-a26d-5492dbc7b995.log",
"event_job_id": null,
"profile_job_id": "job_1536029076029_3084",
"event_job_status": null,
"profile_job_status": "SUCCESS",
"event_data_load_status": null,
"project_name": "test_project"
},
{
"id": 7,
"session_id": 306,
"job_name": "hourly_import",
"scheduler_job_record_id": null,
"import_path": "/sa/tmp/hdfsImport",
"parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}",
"start_time": "2018-09-10 18:58:45",
"end_time": "2018-09-10 19:01:10",
"counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0}",
"log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-c7514335-5a55-42b8-bfd3-0ad7a27ec1a3.log",
"event_job_id": "job_1536029076029_3082",
"profile_job_id": "job_1536029076029_3083",
"event_job_status": "SUCCESS",
"profile_job_status": "FAILED",
"event_data_load_status": "SUCCESS",
"project_name": "test_project"
}
]
6. 更新日志
2018-09-13 (SA 版本号:1.13)
- 导入配置项中增加参数
disable_import_succeeded_source_path
- 导入配置项中增加参数
write_import_info
- 可使用简略的
hdfs_importer
命令替换sh $SENSORS_ANALYTICS_HOME/tools/hdfs_importer/bin/hdfs_importer.sh
- 新增查询导入历史子命令
2017-11-29 (SA 版本号:1.10.3329)
- 支持自定义 job_name 参数:
user_define_job_name
2017-07-10 (SA 版本号:1.7.2628)
- 新增 Mapper 内存参数
mapper_max_memory_size_mb
- 支持用户自定义 InputFormat,详情请参考 github :HDFSImporter 自定义 InputFormat
2017-07-04 (SA 版本号:1.7.2614)
- 支持用户自定义预处理模块,详情请参考 github :HDFSImporter 预处理模块
- 新增
debug
模式