只有光头才能变强。 文本已收录至我的GitHub精选文章,欢迎Star:https://github.com/ZhongFuCheng3y/3y 听说过大数据的同学应该都听说过Storm吧?其实我现在负责的系统用的就是Storm,在最开始接手系统的时候,我是完全不了解Storm的(现在其实也是一知半解而已) 由于最近在整理系统,所以顺便花了点时间入门了一下Storm(前几天花了点时间改了一下,上线以后一堆Bug,于是就果断回滚了。) 这篇文章来讲讲简单Storm的简单使用,没有复杂的东西。看完这篇文章,等到接手Storm的代码的时候你们**『大概』『应该』**能看懂Storm的代码。 我们首先进官方看一下Storm的介绍: Apache Storm is a free and open source distributed realtime computation system Storm是一个分布式的实时计算系统。 分布式:我在之前已经写过挺多的分布式的系统了,比如Kafka/HDFS/Elasticsearch等等。现在看到分布式这个词,三歪第一反应就是「它的存储或者计算交由多台服务器上完成,最后汇总起来达到最终的效果」。 实时:处理速度是毫秒级或者秒级的 计算:可以简单理解为对数据进行处理,比如清洗数据(对数据进行规整,取出有用的数据)。 我现在做的消息管理平台是可以推送各类的消息的(IM/PUSH/短信/微信消息等等),消息下发后,我们是肯定要知道这条消息的下发情况的(是否发送成功,如果用户没收到是由于什么原因导致用户没收到,消息是否被点击了等等)。 消息是否成功下发到用户上,这是运营和客服经常关心的问题。 消息下发的效果,这是运营非常关心的问题 基于上面问题,我们用了Storm做了一套自己的埋点方案,帮助我们快速确认消息是否成功下发到用户上以及统计消息下发的效果。 听起来好像很牛逼,下面我来讲讲背景,看完你就会发现一点儿都不难。 消息管理平台虽然看起来只是发消息的,但是系统设计还是有点东西的。我们以「微服务」的思想去看这个系统,会将不同的功能模块抽取到不同的系统的。 其中PUSH(推送)的链路是最长的,一条消息下发经过的后端系统就有7个,如图下: 这7个系统都有可能「干掉」了这条消息,导致用户没收到。如果我们每去查一个问题,都要逐一排查每个系统,那实在是太慢了。 很多时候客服反馈过来的问题都是当天的,甚至是前几分钟的,我们需要有一个及时的反馈给客服来帮助用户找到为什么收不到消息的原因。 于是我们要做两个功能: 如果是单纯查问题,我们将各个系统的日志收集到Kafka,然后写到Elasticsearch这个是完全没问题的(现在我们也是这么干的) 涉及到统计相关的,我们就有自己的一套埋点方案,这个是便于对数据的统计,也能完成部分排查的功能。 前面提到了「埋点」,实际上就是打日志。其实就是在关键的地方上打上日志做记录,方便排查问题。 比如,现在我们有7个系统,每个系统在执行消息的时候都会可能导致这条消息发不出去(可能是消息去重了,可能是用户的手机号不正确,可能是用户太久没有登录了等等都有可能)。我们在这些『关键位置』都打上日志,方便我们去排查。 这些「关键位置」我们都给它用简单的数字来命个名。比如说:我们用「11」来代表这个用户没有绑定手机号,用「12」来代表这个用户10分钟前收到了一条一模一样的消息,用「13」来代表这个用户屏蔽了消息….. 「11」「12」「13」「14」「15」「16」这些就叫做「点位」,把这些点位在关键的位置中打上日志,这个就叫做「埋点」 有了埋点,我们要做的就是将这些点位收集起来,然后统一处理成我们的格式,输出到数据源中。 OK,就是分三步: 收集日志我们有logAgent帮我们收集到Kafka,实时清洗日志我们用的就是Storm,清洗完我们输出到Redis(实时)/Hive(离线)。 Storm一般是在处理(清洗)那层,Storm的上下游也很明确了(上游是消息队列,下游写到各种数据源,这种是最常见的): Storm统一清洗出来放到Redis,我们就可以通过接口来很方便去查一条消息的整体下发情况,比如: 到这里,主要想说明我们通过Storm来实时清洗数据,下来来讲讲Storm的基本使用~ 我们从一段最简单的Storm代码入门,先看看下面的代码: 如果完全没看过Storm代码的同学,看到上面的代码会怎么分析?我是这样的: 我们简单搜一下,就可以发现它的流程大致是这样的: Spout是数据的源头,一般我们用它去接收数据,Spout接收到数据后往Bolt上发送,Bolt处理数据(清洗)。Bolt清洗完数据可以写到一个数据源或者传递给下一个Bolt继续清洗。 Topology关联了我们在程序中定义好的Spout和Bolt。各种 Spout 和 Bolt 连接在一起之后,就成了一个 Topology,一个 Topology 就是一个 Storm 应用。 Spout往Bolt传递数据,Bolt往Bolt传递数据,这个传递的过程叫做Stream,Stream传递的是一个一个Tuple。 现在问题来了,我们的Spout和Bolt之间是怎么关联起来的呢?Bolt和Bolt之间是怎么关联起来的呢? 在上面的图我们知道一个Topology会有多个Spout和多个Bolt,那我怎么知道这个Spout传递的数据是给这个Bolt,这个Bolt传递的数据是给另外一个Bolt?(说白了,就是上面图上的箭头是怎么关联的呢?) 在Storm中,有Grouping的机制,就是决定Spout的数据流向哪个Bolt,Bolt的数据流向下一个Bolt。 为了提高并发度,我们在setBolt的时候,可以指定Bolt的线程数,也就是所谓的Executor(Spout也同样可以指定线程数的,只是这次我拿Bolt来举例)。我们的结构可能会是这样的: 分组的策略有以下: shuffleGrouping策略我们是用得最多的,比如上面的图上有两个Spout,我们会将这两个Spout的Tuple均匀分发到各个Bolt中执行。 说到这里,我们再回头看看最开始的代码,我给补充一下注释,你们应该就能看得懂了: 我还是再画一个图吧: 入门的过程复杂吗?不复杂。说白了就是Spout接收到数据,通过grouping机制将Spout的数据传到给Bolt处理,Bolt处理完看还需不需要继续往下处理,如果需要就传递给下一个Bolt,不需要就写到数据源、调接口等等。 当我们提交任务之后,会发生什么呢?我们来看看。 流程大致如下: Nimbus和Supervisor都是节点(服务器),Storm用Zookeeper去管理Supervisor节点的信息。 Supervisor节点下会创建Worker进程,创建多少个Worker进程由Conf配置文件决定。线程Executor,由进程产生,用于执行任务,Executor线程数有多少个是在setBolt、setSpout的时候决定。Task是真正的任务执行者,Task其实就是包装了Bolt/Spout实例。 关于Worker、Executor、Task之间的关系,在官网有一个例子专门说明了,我们可以看看。先放出代码: 内部的图: 解释一下: 从上面我们可以知道 一般来说不会,因为很多情况下,一个线程是对应一个Task的(Task你可以理解为Bolt/Spout的实例),既然每个线程是处理自己的实例了,那当然不会有线程安全的问题啦。(当然了,你如果在Bolt/Spout中设置了静态成员变量,那还是会有线程安全问题) 这篇文章简单地介绍了一下Storm,Storm的东西其实还有很多,包括ack机制什么的。现在进官方找文档,都在主推Trident了,有兴趣的同学可以继续往下看。 话又说回来,我司也在主推Flink了,这块后续如果有迁移计划,我也准备学学搞搞,到时候再来入门文章。 参考资料: 求!!!
前言
什么是Storm
我们使用Storm做了什么?
需求背景
需求实现
Storm入门
Storm架构
conf.setNumWorkers(2)
代表会创建两个Worker进程setSpout("blue-spout", new BlueSpout(), 2)
蓝色Spout会有两个线程处理,因为有两个进程,所以一个进程会有一个蓝色Spout线程topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2).setNumTasks(4)
绿色Bolt会有两个线程处理,因为有两个进程Worker所以一个进程会有一个绿色Bolt线程。又因为设置了4个Task数,所以一个线程会分配两个绿色的TasktopologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6).shuffleGrouping("green-bolt")
。黄色Bolt会有6个线程处理,因为创建了两个进程,所以一个进程会有3个黄色Bolt线程。没有单独设置Task书,所以一个线程默认有一个Taskthreads ≤ tasks
线程数是肯定小于等于Task数的。有没有好奇宝宝会问:「Storm用了线程,那么会有线程不安全的情况吗?」(其实这是三歪刚学的疑问)最后
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算