Storm学习

Storm学习

1.导学

VMware Fusion
Mac上搭建:为了给大家演示如何使用我们的OOTB环境
Hadoop环境:虚拟机,我是远程登录
Mac
那么就不需要使用我们的OOTB环境
VMware Fusion+OOTB

Window:VMware

hadoop/hadoop
root用户的密码是什么?
修改配置文件,是需要root权限的,怎么办?
sudo command

只有一个地方需要修改:ip地址
/etc/hosts
192.168.199.128 hadoop000
192.168.199.128 localhost

2.初识实时流处理Storm

Apache Storm is a free and open source distributed realtime computation system
免费
开源
分布式
实时计算系统

Storm makes it easy to reliably process unbounded streams of data
unbounded: 无界
bounded: Hadoop/Spark SQL 离线(input …. output)

Storm has many use cases:
realtime analytics,
online machine learning,
continuous computation,
distributed RPC,
ETL, and more.

Storm特点
fast:a million tuples processed per second per node
scalable,
fault-tolerant,
guarantees your data will be processed
and is easy to set up and operate.

小结:Strom能实现高频数据和大规模数据的实时处理

Storm产生于BackType(被Twitter收购)公司

#…#

需求:大数据的实时处理

自己来实现实时系统,要考虑的因素:

1) 健壮性
2) 扩展性/分布式
3) 如何使得数据不丢失,不重复
4) 高性能、低延时

Storm开源
2011.9
Apache
Clojure Java

Storm技术网站

1) 官网: storm.apache.org
2) GitHub: github.com/apache/storm
3) wiki: https://en.wikipedia.org/wiki/Storm_(event_processor)

Storm vs Hadoop
数据源/处理领域
处理过程
Hadoop: Map Reduce
Storm: Spout Bolt
进程是否结束
处理速度
使用场景

发展趋势
1) 社区的发展、活跃度
2) 企业的需求
3) 大数据相关的大会, Storm主题的数量上升
4) 互联网 JStorm

3.Storm核心概念

核心概念
Topologies
拓扑,将整个流程串起来
Streams
流,数据流,水流
Spouts
产生数据/水的东西
Bolts
处理数据/水的东西 水壶/水桶
Tuple
数据/水

制约中国互联网发展的最大瓶颈是什么? 后厂村路
13号线:回龙观==>龙泽==>西二旗

Storm核心概念总结
Topology: 计算拓扑,由spout和bolt组成的
Stream:消息流,抽象概念,没有边界的tuple构成
Tuple:消息/数据 传递的基本单元
Spout:消息流的源头,Topology的消息生产者
Bolt:消息处理单元,可以做过滤、聚合、查询/写数据库的操作

4.Storm编程

搭建开发环境
jdk: 1.8
windows: exe
linux/mac(dmg): tar ….. 把jdk指定到系统环境变量(~/.bash_profile)
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home
export PATH=$JAVA_HOME/bin:$PATH

        source ~/.bash_profile
        echo $JAVA_HOME
        java -version
IDEA: 
Maven: 3.3+
    windows/linux/mac 下载安装包
    tar .... -C ~/app

    把maven指定到系统环境变量(~/.bash_profile)
    export MAVEN_HOME=/Users/rocky/app/apache-maven-3.3.9
    export PATH=$MAVEN_HOME/bin:$PATH
    source ~/.bash_profile
    echo $JAVA_HOME
    mvn -v

    调整maven依赖下载的jar所在位置: $MAVEN_HOME/conf/setting.xml
    <localRepository>/Users/rocky/maven_repos</localRepository>

在pom.xml中添加storm的maven依赖
    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-core</artifactId>
      <version>1.1.1</version>
    </dependency>

ISpout
概述
核心接口(interface),负责将数据发送到topology中去处理
Storm会跟踪Spout发出去的tuple的DAG
ack/fail
tuple: message id
ack/fail/nextTuple是在同一个线程中执行的,所以不用考虑线程安全方面

核心方法
    open: 初始化操作
    close: 资源释放操作
    nextTuple: 发送数据   core api
    ack: tuple处理成功,storm会反馈给spout一个成功消息
    fail:tuple处理失败,storm会发送一个消息给spout,处理失败

实现类
    public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
    public interface IRichSpout extends ISpout, IComponent 
    DRPCSpout
    ShellSpout

IComponent接口
概述:
public interface IComponent extends Serializable
为topology中所有可能的组件提供公用的方法

    void declareOutputFields(OutputFieldsDeclarer declarer);
    用于声明当前Spout/Bolt发送的tuple的名称
    使用OutputFieldsDeclarer配合使用


实现类:
public abstract class BaseComponent implements IComponent

IBolt接口
概述
职责:接收tuple处理,并进行相应的处理(filter/join/….)
hold住tuple再处理
IBolt会在一个运行的机器上创建,使用Java序列化它,然后提交到主节点(nimbus)上去执行
nimbus会启动worker来反序列化,调用prepare方法,然后才开始处理tuple处理

方法
    prepare:初始化
    execute:处理一个tuple暑假,tuple对象中包含了元数据信息
    cleanup:shutdown之前的资源清理操作


实现类:
    public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {
    public interface IRichBolt extends IBolt, IComponent 
    RichShellBolt

求和案例
需求:1 + 2 + 3 + …. = ???
实现方案:
Spout发送数字作为input
使用Bolt来处理业务逻辑:求和
将结果输出到控制台
拓扑设计: DataSourceSpout –> SumBolt

词频统计
需求:读取指定目录的数据,并实现单词计数功能
实现方案:
Spout来读取指定目录的数据,作为后续Bolt处理的input
使用一个Bolt把input的数据,切割开,我们按照逗号进行分割
使用一个Bolt来进行最终的单词的次数统计操作
并输出
拓扑设计: DataSourceSpout ==> SplitBolt ==> CountBolt

Storm编程注意事项

1) Exception in thread “main” java.lang.IllegalArgumentException: Spout has already been declared for id DataSourceSpout
2) org.apache.storm.generated.InvalidTopologyException: null
3) topology的名称不是重复: local似乎没问题, 等我们到集群测试的时候再来验证这个问题

5.Storm周边框架使用

环境前置说明:
通过我们的客户端(终端,CRT,XShell)
ssh hadoop@hadoop000
ssh hadoop@192.168.199.102

远程服务器的用户名是hadoop,密码也是hadoop
有没有提供root权限,sudo command

hadoop000(192.168.199.102)是远程服务器的hostname

如果你想在本地通过ssh hadoop@hadoop000远程登录,
那么你本地的hosts肯定要添加ip和hostname的映射

192.168.199.102  hadoop000

JDK的安装
将所有的软件都安装到~/app
tar -zxvf jdk-8u91-linux-x64.tar.gz -C ~/app/

建议将jdk的bin目录配置到系统环境变量中: ~/.bash_profile
    export JAVA_HOME=/home/hadoop/app/jdk1.8.0_91
    export PATH=$JAVA_HOME/bin:$PATH

让系统环境变量生效
    source ~/.bash_profile

验证
    java -version

ZooKeeper安装
下载ZK的安装包:http://archive.cloudera.com/cdh5/cdh/5/
解压:tar -zxvf zookeeper-3.4.5-cdh5.7.0.tar.gz -C ~/app/
建议ZK_HOME/bin添加到系统环境变量: ~/.bash_profile
export ZK_HOME=/home/hadoop/app/zookeeper-3.4.5-cdh5.7.0
export PATH=$ZK_HOME/bin:$PATH
让系统环境变量生效
source ~/.bash_profile
修改ZK的配置: $ZK_HOME/conf/zoo.cfg
dataDir=/home/hadoop/app/tmp/zookeeper
启动zk: $ZK_HOME/bin/
zkServer.sh start
验证: jps
多了一个QuorumPeerMain进程,就表示zk启动成功了

ELK:
www.elastic.co

Kafka概述
和消息系统类似

消息中间件:生产者和消费者

妈妈:生产者
你:消费者
馒头:数据流、消息

    正常情况下: 生产一个  消费一个
    其他情况:  
        一直生产,你吃到某一个馒头时,你卡主(机器故障), 馒头就丢失了
        一直生产,做馒头速度快,你吃来不及,馒头也就丢失了

    拿个碗/篮子,馒头做好以后先放到篮子里,你要吃的时候去篮子里面取出来吃

篮子/框: Kafka
    当篮子满了,馒头就装不下了,咋办? 
    多准备几个篮子 === Kafka的扩容

Kafka架构
producer:生产者,就是生产馒头(老妈)
consumer:消费者,就是吃馒头的(你)
broker:篮子
topic:主题,给馒头带一个标签,topica的馒头是给你吃的,topicb的馒头是给你弟弟吃

单节点单broker的部署及使用

$KAFKA_HOME/config/server.properties
broker.id=0
listeners
host.name
log.dirs
zookeeper.connect

启动Kafka
kafka-server-start.sh
USAGE: /home/hadoop/app/kafka_2.11-0.9.0.0/bin/kafka-server-start.sh [-daemon] server.properties [–override property=value]*

kafka-server-start.sh $KAFKA_HOME/config/server.properties

创建topic: zk
kafka-topics.sh –create –zookeeper hadoop000:2181 –replication-factor 1 –partitions 1 –topic hello_topic

查看所有topic
kafka-topics.sh –list –zookeeper hadoop000:2181

发送消息: broker
kafka-console-producer.sh –broker-list hadoop000:9092 –topic hello_topic

消费消息: zk
kafka-console-consumer.sh –zookeeper hadoop000:2181 –topic hello_topic –from-beginning

–from-beginning的使用

查看所有topic的详细信息:kafka-topics.sh –describe –zookeeper hadoop000:2181
查看指定topic的详细信息:kafka-topics.sh –describe –zookeeper hadoop000:2181 –topic hello_topic

单节点多broker
server-1.properties
log.dirs=/home/hadoop/app/tmp/kafka-logs-1
listeners=PLAINTEXT://:9093
broker.id=1

server-2.properties
log.dirs=/home/hadoop/app/tmp/kafka-logs-2
listeners=PLAINTEXT://:9094
broker.id=2

server-3.properties
log.dirs=/home/hadoop/app/tmp/kafka-logs-3
listeners=PLAINTEXT://:9095
broker.id=3

kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &

kafka-topics.sh –create –zookeeper hadoop000:2181 –replication-factor 3 –partitions 1 –topic my-replicated-topic

kafka-console-producer.sh –broker-list hadoop000:9093,hadoop000:9094,hadoop000:9095 –topic my-replicated-topic
kafka-console-consumer.sh –zookeeper hadoop000:2181 –topic my-replicated-topic

kafka-topics.sh –describe –zookeeper hadoop000:2181 –topic my-replicated-topic

6.Storm架构及部署

Storm架构
类似于Hadoop的架构,主从(Master/Slave)
Nimbus: 主
集群的主节点,负责任务(task)的指派和分发、资源的分配
Supervisor: 从
可以启动多个Worker,具体几个呢?可以通过配置来指定
一个Topo可以运行在多个Worker之上,也可以通过配置来指定
集群的从节点,(负责干活的),负责执行任务的具体部分
启动和停止自己管理的Worker进程
无状态,在他们上面的信息(元数据)会存储在ZK中
Worker: 运行具体组件逻辑(Spout/Bolt)的进程
=====================分割线===================
task:
Spout和Bolt
Worker中每一个Spout和Bolt的线程称为一个Task
executor: spout和bolt可能会共享一个线程

Storm部署的前置条件
jdk7+
python2.6.6+

我们课程使用的Storm版本是:1.1.1

Storm部署
下载
解压到/app
添加到系统环境变量:
/.bash_profile
export STORM_HOME=/home/hadoop/app/apache-storm-1.1.1
export PATH=$STORM_HOME/bin:$PATH
使其生效: source ~/.bash_profile

目录结构
    bin
    examples
    conf
    lib

Storm启动
$STORM_HOME/bin/storm 如何使用 执行storm就能看到很多详细的命令
dev-zookeeper 启动zk
storm dev-zookeeper 前台启动
nohup sh storm dev-zookeeper &
jps : dev_zookeeper
nimbus 启动主节点
nohup sh storm nimbus &
supervisor 启动从节点
nohup sh storm supervisor &
ui 启动UI界面
nohup sh storm ui &
logviewer 启动日志查看服务
nohup sh storm logviewer &

注意事项
1) 为什么是4个slot
2) 为什么有2个Nimbus

Storm如何运行我们自己开发的应用程序呢?
Syntax: storm jar topology-jar-path class args0 args1 args2

storm jar /home/hadoop/lib/storm-1.0.jar com.imooc.bigdata.ClusterSumStormTopology

问题: 3个executor,那么页面就看到spout1个和bolt1个,那么还有一个去哪了?

如何修改将跑在本地的storm app改成运行在集群上的
StormSubmitter.submitTopology(topoName,new Config(), builder.createTopology());

storm 其他命令的使用

list
        Syntax: storm list
        List the running topologies and their statuses.

如何停止作业
    kill
        Syntax: storm kill topology-name [-w wait-time-secs]

如何停止集群
    hadoop: stop-all.sh
    kill -9 pid,pid....

Storm集群的部署规划
hadoop000 192.168.199.102
hadoop001 192.168.199.247
hadoop002 192.168.199.138

每台机器的host映射:/etc/hosts
    192.168.199.102 hadoop000
    192.168.199.247 hadoop001
    192.168.199.138 hadoop002


hadoop000: zk nimbus  supervisor
hadoop001: zk           supervisor
hadoop002: zk         supervisor

安装包的分发: 从hadoop000机器做为出发点
scp xxxx hadoop@hadoop001:/software
scp xxxx hadoop@hadoop002:
/software

jdk的安装
解压
配置到系统环境变量
验证

ZK分布式环境的安装
server.1=hadoop000:2888:3888
server.2=hadoop001:2888:3888
server.3=hadoop002:2888:3888

hadoop000的dataDir目录: myid的值1
hadoop001的dataDir目录: myid的值2
hadoop002的dataDir目录: myid的值3

在每个节点上启动zk: zkServer.sh start
在每个节点上查看当前机器zk的状态: zkServer.sh status

Storm集群
$STORM_HOME/conf/storm.yaml
storm.zookeeper.servers:
- “hadoop000”
- “hadoop001”
- “hadoop002”

storm.local.dir: "/home/hadoop/app/tmp/storm"

supervisor.slots.ports:
     - 6700
     - 6701
     - 6702
     - 6703

启动
hadoop000: nimbus supervisor(ui,logviewer)
hadoop001: supervisor(logviewer)
hadoop002: supervisor(logviewer)

nimbus 启动主节点
nohup sh storm nimbus &
supervisor 启动从节点
nohup sh storm supervisor &
ui 启动UI界面
nohup sh storm ui &
logviewer 启动日志查看服务
nohup sh storm logviewer &

启动完所有的进程之后,查看
[hadoop@hadoop000 bin]$ jps
7424 QuorumPeerMain
8164 Supervisor
7769 nimbus
8380 logviewer
7949 core

[hadoop@hadoop001 bin]$ jps
3142 logviewer
2760 QuorumPeerMain
2971 Supervisor

[hadoop@hadoop002 bin]$ jps
3106 logviewer
2925 Supervisor
2719 QuorumPeerMain

storm jar /home/hadoop/lib/storm-1.0.jar com.imooc.bigdata.ClusterSumStormTopology

目录树
storm.local.dir
nimbus/inbox:stormjar-….jar
supervisor/stormdist
ClusterSumStormTopology-1-1511599690
│ │ ├── stormcode.ser
│ │ ├── stormconf.ser
│ │ └── stormjar.jar

7.并行度

并行度
一个worker进程执行的是一个topo的子集
一个worker进程会启动1..n个executor线程来执行一个topo的component
一个运行的topo就是由集群中多台物理机上的多个worker进程组成

executor是一个被worker进程启动的单独线程,每个executor只会运行1个topo的一个component
task是最终运行spout或者bolt代码的最小执行单元

默认:
    一个supervisor节点最多启动4个worker进程  ?
    每一个topo默认占用一个worker进程         ?
    每个worker进程会启动一个executor        ?
    每个executor启动一个task                ?

Total slots:4
Executors: 3 ???? spout + bolt = 2 why 3?
acker 导致的

8.分组策略

A stream grouping defines how that stream should be partitioned among the bolt’s tasks

storm jar /home/had/lib/storm-1.0.jar com.imooc.bigdata.ClusterSumShuffleGroupingStormTopology

9.Storm可靠性