本文共 2820 字,大约阅读时间需要 9 分钟。
标签(空格分隔): 大数据平台构建
- 一: Streamset 简介与系统环境介绍
- 二: 安装软件准备
- 三: 在CDH5.14.4 集成使用StreamSets
- 四: streamsets 基本使用案例运行
1.1: StreamSet 简介
StreamSets由Informatica前首席产品官Girish Pancha和Cloudera前开发团队负责人Arvind Prabhakar于2014年创立。他们成立该公司主要是应对来自动态数据(data in motion)的挑战 - 包括数据源,数据处理和数据本身,这是一个称为“数据漂移“(https://streamsets.com/reports/data-drift/)的问题。StreamSets设想从头开始管理数据流,避免已有产品和工具的缺陷,并启用一种管理动态数据(data in motion)的新方法。最新的产品StreamSets Dataflow Performance Manager,也叫DPM,主要用于构建端到端的数据流。DPM是一个运行控制中心,可以让你映射(数据流),内置的测量和监测确保持续的数据传输和控制动态数据(data in motion)的性能。首先,它将你不同的数据流映射到支持你的每个关键业务流程的拓扑中。然后监测这些拓扑的日常运行情况,根据掌握的性能情况,以满足应用的SLA为目标,确保你始终提供及时和可信的数据。StreamSet的架构处理数据流程
下载:StreamSet https://archives.streamsets.com/index.html 下载文件: STREAMSETS-3.0.0.0.jar STREAMSETS_DATACOLLECTOR-3.0.0.0-el7.parcel manifest.json
yum install -y httpd* service httpd start chkconfig httpd on mkdir -p /var/×××w/html/streamsetscp -p STREAMSETS-3.0.0.jar /opt/cloudera/csd/chown cloudera-scm:cloudrea-scm -R /opt/cloudera/csd/mv manifest.json /var/×××w/html/streamset/mv STREAMSETS_DATACOLLECTOR-3.3.0-el7.parcel /var/×××w/html/streamset/从启cdh的CM 服务器service cloudera-scm-server restart
重启cloudera-scm-server cd /etc/init.d/./cloudera-scm-server restart
默认用户名:admin 密码:admin
准备工作:从官网下载测试数据https://×××w.streamsets.com/documentation/datacollector/sample_data/tutorial/nyc_taxi_data.csv
创建测试目录并赋予权限:mkdir -p /flyfish/test_streammkdir /flyfish/test_stream/datamkdir /flyfish/test_stream/errormkdir /flyfish/test_stream/outchmod -R 777 /flyfish/test_stream
将测试数据拷贝到 /flyfish/test_stream/data 目录下cp -p nyc_taxi_data.csv /flyfish/test_stream/data
点击dataFormat 标签,修改选择如下选择
预览文件
添加流选择器 ${record:value('/payment_type') == 'CRD'}
脚本放在 Jython >configuration>Jython>Script 中
try: for record in records: cc = record.value['credit_card'] if cc == '': error.write(record, "Payment type was CRD, but credit card was null") continue cc_type = '' if cc.startswith('4'): cc_type = 'Visa' elif cc.startswith(('51','52','53','54','55')): cc_type = 'MasterCard' elif cc.startswith(('34','37')): cc_type = 'AMEX' elif cc.startswith(('300','301','302','303','304','305','36','38')): cc_type = 'Diners Club' elif cc.startswith(('6011','65')): cc_type = 'Discover' elif cc.startswith(('2131','1800','35')): cc_type = 'JCB' else: cc_type = 'Other' record.value['credit_card_type'] = cc_type output.write(record)except Exception as e: error.write(record, e.message)
使用Field Masker来屏蔽信用卡的信息
配置写入目的地
流程预览测试
添加Expression Evaluator处理器
编辑流运行与输出
转载于:https://blog.51cto.com/flyfish225/2321515