Storm学习
1.导学
VMware Fusion
Mac上搭建:为了给大家演示如何使用我们的OOTB环境
Hadoop环境:虚拟机,我是远程登录
Mac
那么就不需要使用我们的OOTB环境
VMware Fusion+OOTB
Window:VMwarehadoop/hadoop
root用户的密码是什么?
修改配置文件,是需要root权限的,怎么办?
sudo command
只有一个地方需要修改:ip地址
/etc/hosts
192.168.199.128 hadoop000
192.168.199.128 localhost
2.初识实时流处理Storm
Apache Storm is a free and open source distributed realtime computation system
免费
开源
分布式
实时计算系统
Storm makes it easy to reliably process unbounded streams of data
unbounded: 无界
bounded: Hadoop/Spark SQL 离线(input …. output)
Storm has many use cases:
realtime analytics,
online machine learning,
continuous computation,
distributed RPC,
ETL, and more.
Storm特点
fast:a million tuples processed per second per node
scalable,
fault-tolerant,
guarantees your data will be processed
and is easy to set up and operate.
小结:Strom能实现高频数据和大规模数据的实时处理
Storm产生于BackType(被Twitter收购)公司
#…#
需求:大数据的实时处理
自己来实现实时系统,要考虑的因素:
1) 健壮性
2) 扩展性/分布式
3) 如何使得数据不丢失,不重复
4) 高性能、低延时
Storm开源
2011.9
Apache
Clojure Java
Storm技术网站
1) 官网: storm.apache.org
2) GitHub: github.com/apache/storm
3) wiki: https://en.wikipedia.org/wiki/Storm_(event_processor)
Storm vs Hadoop
数据源/处理领域
处理过程
Hadoop: Map Reduce
Storm: Spout Bolt
进程是否结束
处理速度
使用场景
发展趋势
1) 社区的发展、活跃度
2) 企业的需求
3) 大数据相关的大会, Storm主题的数量上升
4) 互联网 JStorm
3.Storm核心概念
核心概念
Topologies
拓扑,将整个流程串起来
Streams
流,数据流,水流
Spouts
产生数据/水的东西
Bolts
处理数据/水的东西 水壶/水桶
Tuple
数据/水
制约中国互联网发展的最大瓶颈是什么? 后厂村路
13号线:回龙观==>龙泽==>西二旗
Storm核心概念总结
Topology: 计算拓扑,由spout和bolt组成的
Stream:消息流,抽象概念,没有边界的tuple构成
Tuple:消息/数据 传递的基本单元
Spout:消息流的源头,Topology的消息生产者
Bolt:消息处理单元,可以做过滤、聚合、查询/写数据库的操作


4.Storm编程
搭建开发环境
jdk: 1.8
windows: exe
linux/mac(dmg): tar ….. 把jdk指定到系统环境变量(~/.bash_profile)
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home
export PATH=$JAVA_HOME/bin:$PATH
source ~/.bash_profile
echo $JAVA_HOME
java -version
IDEA:
Maven: 3.3+
windows/linux/mac 下载安装包
tar .... -C ~/app
把maven指定到系统环境变量(~/.bash_profile)
export MAVEN_HOME=/Users/rocky/app/apache-maven-3.3.9
export PATH=$MAVEN_HOME/bin:$PATH
source ~/.bash_profile
echo $JAVA_HOME
mvn -v
调整maven依赖下载的jar所在位置: $MAVEN_HOME/conf/setting.xml
<localRepository>/Users/rocky/maven_repos</localRepository>
在pom.xml中添加storm的maven依赖
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
</dependency>ISpout
概述
核心接口(interface),负责将数据发送到topology中去处理
Storm会跟踪Spout发出去的tuple的DAG
ack/fail
tuple: message id
ack/fail/nextTuple是在同一个线程中执行的,所以不用考虑线程安全方面
核心方法
open: 初始化操作
close: 资源释放操作
nextTuple: 发送数据 core api
ack: tuple处理成功,storm会反馈给spout一个成功消息
fail:tuple处理失败,storm会发送一个消息给spout,处理失败
实现类
public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
public interface IRichSpout extends ISpout, IComponent
DRPCSpout
ShellSpoutIComponent接口
概述:
public interface IComponent extends Serializable
为topology中所有可能的组件提供公用的方法
void declareOutputFields(OutputFieldsDeclarer declarer);
用于声明当前Spout/Bolt发送的tuple的名称
使用OutputFieldsDeclarer配合使用
实现类:
public abstract class BaseComponent implements IComponentIBolt接口
概述
职责:接收tuple处理,并进行相应的处理(filter/join/….)
hold住tuple再处理
IBolt会在一个运行的机器上创建,使用Java序列化它,然后提交到主节点(nimbus)上去执行
nimbus会启动worker来反序列化,调用prepare方法,然后才开始处理tuple处理
方法
prepare:初始化
execute:处理一个tuple暑假,tuple对象中包含了元数据信息
cleanup:shutdown之前的资源清理操作
实现类:
public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {
public interface IRichBolt extends IBolt, IComponent
RichShellBolt求和案例
需求:1 + 2 + 3 + …. = ???
实现方案:
Spout发送数字作为input
使用Bolt来处理业务逻辑:求和
将结果输出到控制台
拓扑设计: DataSourceSpout –> SumBolt
词频统计
需求:读取指定目录的数据,并实现单词计数功能
实现方案:
Spout来读取指定目录的数据,作为后续Bolt处理的input
使用一个Bolt把input的数据,切割开,我们按照逗号进行分割
使用一个Bolt来进行最终的单词的次数统计操作
并输出
拓扑设计: DataSourceSpout ==> SplitBolt ==> CountBolt
Storm编程注意事项
1) Exception in thread “main” java.lang.IllegalArgumentException: Spout has already been declared for id DataSourceSpout
2) org.apache.storm.generated.InvalidTopologyException: null
3) topology的名称不是重复: local似乎没问题, 等我们到集群测试的时候再来验证这个问题

5.Storm周边框架使用
环境前置说明:
通过我们的客户端(终端,CRT,XShell)
ssh hadoop@hadoop000
ssh hadoop@192.168.199.102
远程服务器的用户名是hadoop,密码也是hadoop
有没有提供root权限,sudo command
hadoop000(192.168.199.102)是远程服务器的hostname
如果你想在本地通过ssh hadoop@hadoop000远程登录,
那么你本地的hosts肯定要添加ip和hostname的映射
192.168.199.102 hadoop000JDK的安装
将所有的软件都安装到~/app
tar -zxvf jdk-8u91-linux-x64.tar.gz -C ~/app/
建议将jdk的bin目录配置到系统环境变量中: ~/.bash_profile
export JAVA_HOME=/home/hadoop/app/jdk1.8.0_91
export PATH=$JAVA_HOME/bin:$PATH
让系统环境变量生效
source ~/.bash_profile
验证
java -versionZooKeeper安装
下载ZK的安装包:http://archive.cloudera.com/cdh5/cdh/5/
解压:tar -zxvf zookeeper-3.4.5-cdh5.7.0.tar.gz -C ~/app/
建议ZK_HOME/bin添加到系统环境变量: ~/.bash_profile
export ZK_HOME=/home/hadoop/app/zookeeper-3.4.5-cdh5.7.0
export PATH=$ZK_HOME/bin:$PATH
让系统环境变量生效
source ~/.bash_profile
修改ZK的配置: $ZK_HOME/conf/zoo.cfg
dataDir=/home/hadoop/app/tmp/zookeeper
启动zk: $ZK_HOME/bin/
zkServer.sh start
验证: jps
多了一个QuorumPeerMain进程,就表示zk启动成功了
ELK:
www.elastic.co
Kafka概述
和消息系统类似
消息中间件:生产者和消费者
妈妈:生产者
你:消费者
馒头:数据流、消息
正常情况下: 生产一个 消费一个
其他情况:
一直生产,你吃到某一个馒头时,你卡主(机器故障), 馒头就丢失了
一直生产,做馒头速度快,你吃来不及,馒头也就丢失了
拿个碗/篮子,馒头做好以后先放到篮子里,你要吃的时候去篮子里面取出来吃
篮子/框: Kafka
当篮子满了,馒头就装不下了,咋办?
多准备几个篮子 === Kafka的扩容Kafka架构
producer:生产者,就是生产馒头(老妈)
consumer:消费者,就是吃馒头的(你)
broker:篮子
topic:主题,给馒头带一个标签,topica的馒头是给你吃的,topicb的馒头是给你弟弟吃
单节点单broker的部署及使用
$KAFKA_HOME/config/server.properties
broker.id=0
listeners
host.name
log.dirs
zookeeper.connect
启动Kafka
kafka-server-start.sh
USAGE: /home/hadoop/app/kafka_2.11-0.9.0.0/bin/kafka-server-start.sh [-daemon] server.properties [–override property=value]*
kafka-server-start.sh $KAFKA_HOME/config/server.properties
创建topic: zk
kafka-topics.sh –create –zookeeper hadoop000:2181 –replication-factor 1 –partitions 1 –topic hello_topic
查看所有topic
kafka-topics.sh –list –zookeeper hadoop000:2181
发送消息: broker
kafka-console-producer.sh –broker-list hadoop000:9092 –topic hello_topic
消费消息: zk
kafka-console-consumer.sh –zookeeper hadoop000:2181 –topic hello_topic –from-beginning
–from-beginning的使用
查看所有topic的详细信息:kafka-topics.sh –describe –zookeeper hadoop000:2181
查看指定topic的详细信息:kafka-topics.sh –describe –zookeeper hadoop000:2181 –topic hello_topic
单节点多broker
server-1.properties
log.dirs=/home/hadoop/app/tmp/kafka-logs-1
listeners=PLAINTEXT://:9093
broker.id=1
server-2.properties
log.dirs=/home/hadoop/app/tmp/kafka-logs-2
listeners=PLAINTEXT://:9094
broker.id=2
server-3.properties
log.dirs=/home/hadoop/app/tmp/kafka-logs-3
listeners=PLAINTEXT://:9095
broker.id=3
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &
kafka-topics.sh –create –zookeeper hadoop000:2181 –replication-factor 3 –partitions 1 –topic my-replicated-topic
kafka-console-producer.sh –broker-list hadoop000:9093,hadoop000:9094,hadoop000:9095 –topic my-replicated-topic
kafka-console-consumer.sh –zookeeper hadoop000:2181 –topic my-replicated-topic
kafka-topics.sh –describe –zookeeper hadoop000:2181 –topic my-replicated-topic
6.Storm架构及部署
Storm架构
类似于Hadoop的架构,主从(Master/Slave)
Nimbus: 主
集群的主节点,负责任务(task)的指派和分发、资源的分配
Supervisor: 从
可以启动多个Worker,具体几个呢?可以通过配置来指定
一个Topo可以运行在多个Worker之上,也可以通过配置来指定
集群的从节点,(负责干活的),负责执行任务的具体部分
启动和停止自己管理的Worker进程
无状态,在他们上面的信息(元数据)会存储在ZK中
Worker: 运行具体组件逻辑(Spout/Bolt)的进程
=====================分割线===================
task:
Spout和Bolt
Worker中每一个Spout和Bolt的线程称为一个Task
executor: spout和bolt可能会共享一个线程

Storm部署的前置条件
jdk7+
python2.6.6+
我们课程使用的Storm版本是:1.1.1
Storm部署
下载
解压到/app/.bash_profile
添加到系统环境变量:
export STORM_HOME=/home/hadoop/app/apache-storm-1.1.1
export PATH=$STORM_HOME/bin:$PATH
使其生效: source ~/.bash_profile
目录结构
bin
examples
conf
libStorm启动
$STORM_HOME/bin/storm 如何使用 执行storm就能看到很多详细的命令
dev-zookeeper 启动zk
storm dev-zookeeper 前台启动
nohup sh storm dev-zookeeper &
jps : dev_zookeeper
nimbus 启动主节点
nohup sh storm nimbus &
supervisor 启动从节点
nohup sh storm supervisor &
ui 启动UI界面
nohup sh storm ui &
logviewer 启动日志查看服务
nohup sh storm logviewer &
注意事项
1) 为什么是4个slot
2) 为什么有2个Nimbus
Storm如何运行我们自己开发的应用程序呢?
Syntax: storm jar topology-jar-path class args0 args1 args2
storm jar /home/hadoop/lib/storm-1.0.jar com.imooc.bigdata.ClusterSumStormTopology
问题: 3个executor,那么页面就看到spout1个和bolt1个,那么还有一个去哪了?
如何修改将跑在本地的storm app改成运行在集群上的
StormSubmitter.submitTopology(topoName,new Config(), builder.createTopology());
storm 其他命令的使用
list
Syntax: storm list
List the running topologies and their statuses.
如何停止作业
kill
Syntax: storm kill topology-name [-w wait-time-secs]
如何停止集群
hadoop: stop-all.sh
kill -9 pid,pid....Storm集群的部署规划
hadoop000 192.168.199.102
hadoop001 192.168.199.247
hadoop002 192.168.199.138
每台机器的host映射:/etc/hosts
192.168.199.102 hadoop000
192.168.199.247 hadoop001
192.168.199.138 hadoop002
hadoop000: zk nimbus supervisor
hadoop001: zk supervisor
hadoop002: zk supervisor安装包的分发: 从hadoop000机器做为出发点
scp xxxx hadoop@hadoop001:/software/software
scp xxxx hadoop@hadoop002:
jdk的安装
解压
配置到系统环境变量
验证
ZK分布式环境的安装
server.1=hadoop000:2888:3888
server.2=hadoop001:2888:3888
server.3=hadoop002:2888:3888
hadoop000的dataDir目录: myid的值1
hadoop001的dataDir目录: myid的值2
hadoop002的dataDir目录: myid的值3
在每个节点上启动zk: zkServer.sh start
在每个节点上查看当前机器zk的状态: zkServer.sh status
Storm集群
$STORM_HOME/conf/storm.yaml
storm.zookeeper.servers:
- “hadoop000”
- “hadoop001”
- “hadoop002”
storm.local.dir: "/home/hadoop/app/tmp/storm"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703启动
hadoop000: nimbus supervisor(ui,logviewer)
hadoop001: supervisor(logviewer)
hadoop002: supervisor(logviewer)
nimbus 启动主节点
nohup sh storm nimbus &
supervisor 启动从节点
nohup sh storm supervisor &
ui 启动UI界面
nohup sh storm ui &
logviewer 启动日志查看服务
nohup sh storm logviewer &
启动完所有的进程之后,查看
[hadoop@hadoop000 bin]$ jps
7424 QuorumPeerMain
8164 Supervisor
7769 nimbus
8380 logviewer
7949 core
[hadoop@hadoop001 bin]$ jps
3142 logviewer
2760 QuorumPeerMain
2971 Supervisor
[hadoop@hadoop002 bin]$ jps
3106 logviewer
2925 Supervisor
2719 QuorumPeerMain
storm jar /home/hadoop/lib/storm-1.0.jar com.imooc.bigdata.ClusterSumStormTopology
目录树
storm.local.dir
nimbus/inbox:stormjar-….jar
supervisor/stormdist
ClusterSumStormTopology-1-1511599690
│ │ ├── stormcode.ser
│ │ ├── stormconf.ser
│ │ └── stormjar.jar
7.并行度
并行度
一个worker进程执行的是一个topo的子集
一个worker进程会启动1..n个executor线程来执行一个topo的component
一个运行的topo就是由集群中多台物理机上的多个worker进程组成
executor是一个被worker进程启动的单独线程,每个executor只会运行1个topo的一个component
task是最终运行spout或者bolt代码的最小执行单元
默认:
一个supervisor节点最多启动4个worker进程 ?
每一个topo默认占用一个worker进程 ?
每个worker进程会启动一个executor ?
每个executor启动一个task ?Total slots:4
Executors: 3 ???? spout + bolt = 2 why 3?
acker 导致的
8.分组策略
A stream grouping defines how that stream should be partitioned among the bolt’s tasks
storm jar /home/had/lib/storm-1.0.jar com.imooc.bigdata.ClusterSumShuffleGroupingStormTopology
9.Storm可靠性
