flume+kafka+storm整合实现实时计算小案例-程序员宅基地

    我们做数据分析的时候常常会遇到这样两个场景,一个是统计历史数据,这个就是要分析历史保存的日志。我们会使用hadoop,具体框架可以设计为:
1.flume收集日志;
2.HDFS输入路径存储日志;
3.MapReduce计算,将结果输出到HDFS输出路径;
4.hive+sqoop实现将结果转储到mysql
5.我们会使用crontab定时执行一个脚本来做

具体这里就不展开来说了,我会在另一个帖子讲到。这里我们详细介绍第二个场景:实时计算。这个用的比较多的如天猫双十一实时展示交易额,还有比如说银行等。

     实现实时计算要用到storm或者spark等,这里我介绍flume+kafka+storm方案。


使用flume采集日志数据,flume是一个分布式、可靠和高可用的海量日志采集、聚合和传输的系统。它的核心是一个agent,其中包含3个组件,source、channel和sink。Agent会监控日志目录,通过source组件将日志搜集到channel中缓存起来,当sink处理完之后会将缓存的记录删除,已经扫描过的文件会添加.COMPLETED后缀,下次不会重新扫描该文件。
因为日志搜集的速度和日志处理的速度是不一样的,所以加了一个kafka组件,其实也是作为一个缓冲的作用。Sink将日志中一行数据作为消息发布到kafka中,storm通过kafkaSpout来消费kafka消息,storm消费完一条消息需要对kafkaSpout回应(比如我们自定义bolt时如果是继承BaseRichBolt,需要显示调用collector.ack(tuple)和collector.fail(tuple)),这样才不会重复消费消息。
Storm从kafka中拿到一条数据,通过解析字符串,对日志做一定的日志清洗工作,然后计算之后可以将数据存到mysql,然后就可以在前端展示了。
我们需要自定义DailyStatisticsAnalysisTopology来处理任务代码如下:
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- kafka整合storm -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.3</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.30</version>
</dependency>
</dependencies>
<build>
<finalName>Storm_DailyStatisticsAnalysis</finalName>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.wyd.kafkastorm.DailyStatisticsAnalysisTopology</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
-----------------------------------------------------------------------------------------------
public class DailyStatisticsAnalysisTopology {
private static String topicName = "dailyStatisticsAnalysis";
private static String zkRoot = "/stormKafka/"+topicName;
public static void main(String[] args) {
BrokerHosts hosts = new ZkHosts("192.168.*.*:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,zkRoot,UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout",kafkaSpout);
builder.setBolt("dailyStatisticsAnalysisBolt", new DailyStatisticsAnalysisBolt (), 2).shuffleGrouping("kafkaSpout");
Config conf = new Config();
conf.setDebug(true);
if(args != null && args.length > 0) {
conf.setNumWorkers(1);
try {
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}catch (Exception e) {
e.printStackTrace();
}
} else {
conf.setMaxSpoutPending(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("dailyAnalysis", conf, builder.createTopology());
}
}
}
-----------------------------------------------------------------------------
public class DailyStatisticsAnalysisBolt extends BaseRichBolt {

/**
*
*/
private static final long serialVersionUID = 2262767962772699286L;
private OutputCollector _collector;
LogInfoHandler loginfohandler = new LogInfoHandler();
@Override
public void execute(Tuple tuple) {
// 存入mysql
try{
String value = tuple.getString(0);
loginfohandler.splitHandl(value);
DbUtil.insert(loginfohandler.getTarget(), loginfohandler.getTime(),
loginfohandler.getDistrictServer(), loginfohandler.getChannel(), loginfohandler.getCounts());
_collector.ack(tuple);
}catch(Exception e){
_collector.fail(tuple);
e.printStackTrace();
}
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this._collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}

}
-----------------------------------------------------------------------------
进入maven项目的根目录执行mvn assembly:assembly
打包成功后会生成两个jar包:

将他们上传到storm目录下,执行 nohup bin/storm jar Storm_DailyStatisticsAnalysis.jar com.wyd.kafkastorm.DailyStatisticsAnalysisTopology &

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/hwq_19890818/article/details/80415473

智能推荐

mysql 汉字拼音首字母查询_mysql数据库中查询汉字的拼音首字母-程序员宅基地

文章浏览阅读155次。mysql数据库中查询汉字的拼音首字母create table wkcx_cosler(f_PY char primary key,cBegin SMALLINT UNSIGNED not null,cEnd SMALLINT UNSIGNED not null);INSERT INTO wkcx_cosler VALUES('A',0xB0A1,0xB0C4),('B',0xB0C5,0...

WEB前端-程序员宅基地

文章浏览阅读53次。1

itkGDCMSeriesFileNames how to read filenames_gdcmseriesfilenames (0x5571a9ffc060): no series we-程序员宅基地

文章浏览阅读1.2k次。Detailed DescriptionGenerate a sequence of filenames from a DICOM series.This class generate a sequence of files whose filenames points to a DICOM file. The ordering is based on the follow_gdcmseriesfilenames (0x5571a9ffc060): no series were found

FC-SAN vs. IP-SAN技术比较_fc存储比ip存储贵-程序员宅基地

文章浏览阅读1.4k次。从存储设备的结构来看  一般来说IP-SAN存储设备的磁盘控制器不是采用FC-SAN存储设备中的硬件RAID芯片+中央处理器的结构,而是采用每个磁盘柜中分为多个磁盘组,而每个磁盘组由一个微处理芯片控制所有的磁盘RAID操作(采用软件计算,效率较低)和RAID组的管理操作。这样一来_fc存储比ip存储贵

pandas库中loc()与iloc()提取数据介绍-程序员宅基地

文章浏览阅读2.2k次。本文转载自https://blog.csdn.net/W_weiying/article/details/81411257#commentBoxloc函数:通过行索引 “Index” 中的具体值来取行数据(如取"Index"为"A"的行)iloc函数:通过行号来取行数据(如取第二行的数据)1、利用loc,iloc提取行数据import numpy as npimport pandas a...

PCONV:移动设备上的DNN权重剪枝中的稀疏性_dnn设备-程序员宅基地

文章浏览阅读504次。基于深度神经网络(DNN)的模型压缩技术已被广泛认为是在各种平台上实现加速的有效方法,而DNN权重剪枝是一种简单有效的模型压缩方法。目前有两种主流的剪枝方法代表了剪枝的两个极端:非结构化、细粒度的剪枝可以实现高稀疏性和准确性,但对硬件不友好;结构化、粗粒度的剪枝利用了硬件高效的结构,但当压缩率高时,精度会下降。在本文中,我们介绍了PCONV,它包括一个新的稀疏性维度,即粗粒度结构中的细粒度修剪模式。_dnn设备

随便推点

大数据hadoop集群搭建_大数据集群搭建-程序员宅基地

文章浏览阅读2.4k次,点赞2次,收藏25次。安装一个Linux系统 配置网卡 重启网络服务 ping baidu 修改主机名 关闭防火墙 安装ssh客户端 克隆Linux系统 对克隆好的系统配置网卡 ssh链接及免密登录 安装JDK 安装hadoop share中doc可删除 - vi /etc/hadoop/hadoop-env.sh JAVA_HOME在哪里 - export JAVAHOME改为绝对路..._大数据集群搭建

其实用setFieldsValue或者获取setState方法都可以设置DatePicker的默认值。_.setfieldsvalue 初始值-程序员宅基地

文章浏览阅读1.9k次。其实用setFieldsValue或者获取setState方法都可以设置DatePicker的默认值。setState方法,利用FormItem组件的initialValue属性设置默认值:this.setState({defaultDate}) // 自定义默认日期<FormItem label="日期选择框"> {getFieldDecorator('date..._.setfieldsvalue 初始值

FasterNet(PConv)paper笔记(CVPR2023)_蓝羽飞鸟的博客-程序员宅基地

文章浏览阅读609次,点赞3次,收藏4次。PConv,FasterNet笔记

使用CLion做嵌入式开发方法步骤详解?_clion嵌入式-程序员宅基地

文章浏览阅读1.3w次,点赞2次,收藏15次。本文简要介绍了如何使用CLion做STM32上的嵌入式开发。整个开发流程不离开CLion环境,提高工作效率。  JetBrain家的开发工具基本都是最棒的,对编程语言、框架支持得最好。CLion经过几年的发展,从无到有,成为了一个越来越完善的C++ IDE。在官方论坛上,用户呼声很高的一个需求就是remote debugging、嵌入式开发。  在最近几次更新里,CLion逐渐添加_clion嵌入式

Listview+DrawableLayout实现侧边栏效果、简单易懂_layout 侧边栏c#-程序员宅基地

文章浏览阅读909次,点赞2次,收藏2次。效果图:1、Listview+DrawableLayout代码&lt;?xml version="1.0" encoding="utf-8"?&gt;&lt;android.support.v4.widget.DrawerLayout ="http://schemas.android.com/apk/res/android" android..._layout 侧边栏c#

王权富贵书评:《TensofFlow深度学习应用实践》(王晓华 著)一书-程序员宅基地

文章浏览阅读615次。这本书能不要买就不要买了,就是本烂书,例子对不上,程序的缩进对不上,例子还各种报错。推荐指数:负****************************** PS:气...