技术标签: mapreduce hadoop big data 分布式 大数据
1.mapreudce流程:
官网:
(1)整体流程
(input) <k1, v1> -> map -> <k2, v2> -> reduce -> <k3, v3> (output)
(2)解释:
1)整个过程 跟 kv
数据类型=》 mr
2)kv:
1.The key and value classes have to be serializable
2.序列化=》 kv数据类型 实现 =》 Writable interface
3.k 的数据类型=》 WritableComparable 【序列化 + 排序 】
4.【mr 过程 是按照key 进行排序】
5.每个阶段的输出作为下一个阶段的输入
以wordcount为例:
(1)input [跟切片有关系] =》 “文件里的数据 一行一行读取”
a,a,a
b,b,b
=>
k:line号的偏移量
v: 每行的数据
(offset1,"a,a,a")
(offset1,"b,b,b")
【input】
(1)【大部分场景下 一个文件 一个task 处理一个block 】
注意:
不是说每个文件都能进行split(分片):
能分片: 200m =》128m + 剩余的部分 =》 2个task
不能分片: 1T =》 只有一个task 处理
(2)map: 一一映射 =》 y=f(x)
(offset1,"a,a,a")
(offset1,"b,b,b")
f(x) :按照分割符进行拆分 每个单词赋值成1
(offset1,"a,a,a") =》 "a,a,a".split(",")
=>
(a,1)(a,1)(a,1)
(b,1)(b,1)(b,1)
(3)reduce:归约 聚合 + 函数 =》
1.按照某一个规则 把key 拉倒一起 【分组 聚合】
2.“做一些事情” 【函数】
(a,1)(a,1)(a,1)
(b,1)(b,1)(b,1)
=》 “分组” shuffle [k,v是一个集合 ]
a,<1,1,1>
b,<1,1,1>
=> "函数"
a,3
b,3
2.mapreudce 作业 =》 yarn
app master => driver [mian ]
maptask => mapper阶段的代码
reduce task =》 reduce 阶段的代码
mian{
mapper
reduce
driver => map + reduce => yarn
}
8.mapper阶段:
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
(input)
KEYIN:输入数据key的数据类型
VALUEIN: 输入数据value的数据类型
map:
KEYOUT:输出数据key的数据类型
VALUEOUT 输出数据value的数据类型
方法:
setup 每个maptask 开始的时候 执行一次
cleanup每个maptask 结束的时候 执行一次
3.reducer阶段
KEYIN,VALUEIN,KEYOUT,VALUEOUT
(map)
KEYIN:输入数据key的数据类型
VALUEIN: 输入数据value的数据类型
reduce:
KEYOUT:输出数据key的数据类型
VALUEOUT 输出数据value的数据类型
注意:
map阶段的输出作为reduce阶段的输入
4.kv的数据类型:
mr:
k: Text
v:IntWritable
java:
String =>Test
int => IntWritable
double => doubleWritable
null => NUllWritable
kv => Writable
k=》 WritableComparable
k:IntWritable
5.提交服务器运行代码:
hadoop jar \
/home/hadoop/project/mapreduce/bigdata-hdfs-1.0.jar \
com.bigdata.mapreduce.WordCount \
/data/wc.data /out-wc
6.需求: mr 统计每个手机号的上行总流量、下行总流量 、总流量
mr:
mapper:
phone up down
key:phone
value :up down
reduce:
key:phone => 分组
value: <(up down)...>
key:phone => 分组
value:up_totol down_totol totol
table:
phone up down
sql:
维度:phone
指标:上行总流量、下行总流量 、总流量
select
phone,
up_totol,
down_totol,
(up_totol+ down_totol) as totol
from
(
select
phone,
sum(up) as up_totol,
sum(down) as down_totol
from log
group by
phone
) a ;
7.sql:
group by 分组
distinct 去重
join 表关联
order by 排序
parition表 分区:
eg;
517 => /0517/*
(1)order by 排序
需求;
phone uptotl downtotol totol
分析:
mr :
map:
k =》totol
v: phone uptotl downtotol
reduce:
k :
phone uptotl downtotol totol
(2)parition表 分区
1)处理好的文件 文件生成的个数由谁决定的?reducetask 个数由谁决定的?
reducetask 个数由谁决定的?
a.开发人员决定的
b.默认:reducetask 是1个
2)文件生成的个数由谁决定的?
reducetask 个数
3)reducetask 要求:
a.reducetask 个数 >= 分区数 会有空文件
b,reducetask 个数 < 分区数 报错
c.reducetask 个数 1 没有问题
(3)distinct
需求: mr 对数据进行去重
sql: name
select name from table group by name ;[去重]
(4)group by 分组
需求:
sql =》 mr
select deptno ,count(1) as cnt from emp group by deptno;
input
处理 :
mapper:
all =>
deptno
(20,1),(20,1)
reducer:
20,<1,1,1>
聚合统计
20,3
ouput :
1.file
2.mysql
8.切片机制
input =》 数据加载
file
db
InputFormat:
数据源类型
OutputFormat:
数据存储类型
思考:1.maptask个数由什么决定的?
分片的个数
2.什么是分片?(split)
和块有点像
block:
1.文件拆分 形成的block
blocksize =》 128m
split:
1.加载数据 形成的切片
splitsize =》blocksize 128m
3.一个文件加载的时候 会形成多少个切片?
前提:
(1)文件可以被分割
(2)文件不可以被分割
切片的个数 就是一个
文件是否可以被分割?
(1)文件存储格式 :
text、orc、parquet =》 支持分割
(2)文件存储的压缩格式:
有的可以分割的 有的不可以
4.分片的个数如何计算?
(1)文件不可被分割:
就是一个分片
(2)文件可被分割:
1)文件大小 小于 128m
一个文件对应一个切片 一个maptask 【 文件大小 小于 切片的大小】
2)文件大小 大于 128m
filesize /128 = num 切片个数
剩余部分= filesize - num*128
剩余部分 比较大小 splitssize*10%=128*10% = 12.8m
大于等于 开启一个maptask
小于 不会开启maptask 【剩余的部分 合并到之前的切片中】
案例:
130m 文件 问几个切片?
130/128 = 1
130-128*1=2m<12.8m
160m 文件 几个切片 ?
160/128 = 1
160-128*1 = 32m
32m>12.8m => 1
1+1 =2
5.为什么要知道切片的个数呢?
切片的个数 =》 maptask的个数
=》 yarn
一个task 申请一个container 【cpu、mem】
6.wc文件 137m
137/128 =1
137-128*1=9>12.8
idea => local => 切片大小 是128???
本地 32m
137/32=4
9>3.2 =>1
背景电信卡返利系统中,核心功能就是按照用户的维度,统计用户已经销售的卡的充值金额和返利金额,并区分出返利是否已经支付,最后按照未支付的返利金额为客户支付返利。之前是希望直接系统自动统计,支付完成后,在网页上选中,修改支付状态,生成返利支付订单。但是由于查询速度较慢,因此暂时改为了我在数据库中手动查询,然后导出查询记录,邮件发送给财务人员。然而,当前在号码充值订单增加到了 近 20k 条记录后,统计...
buildNumber=$(/usr/libexec/PlistBuddy -c "Print :CFBundleVersion" "${PROJECT_DIR}/${INFOPLIST_FILE}")shortVersion=$(/usr/libexec/PlistBuddy -c "Print :CFBundleShortVersionString" "${PROJECT_DIR}/${INF...
最近学习python的爬虫框架scrapy。官方给的安装文档真心坑爹,自己安装的时候真心不容易啊,一定要记录下来,以备查阅。如果有哪些盆友遇到这样问题,拿走,不谢。笔者环境:用oracle VM virtualVox装的虚拟机,系统是centos6.5,能连互联网。前提准备Python 2.7Pip和setuptools python 包。现在安装pip会连带着安装setuptoolsLxmlOp...
Jmeter概述Jmerter简介Apache Jmeter是Apache组织的开源项目,是一个纯Java桌面应用,用于压力测试和性能测试。它最初设计用于Web应用测试后来扩展到其他测试领域。Jmeter功能能够对HTTP和FTP服务器进行压力和性能测试,也可以对任何数据库进行同样的测试(通过JDBC)。完全的可移植性和100%纯java。完全Swing(ja...
形如$2^{P}-1$的素数称为麦森数,这时P一定也是个素数。但反过来不一定,即如果PPP是个素数,$2^{P}-1$不一定也是素数。到1998年底,人们已找到了37个麦森数。最大的一个是P=3021377,它有909526位。麦森数有许多重要应用,它与完全数密切相关。
基于LDA的Topic Model变形最近几年来,随着LDA的产生和发展,涌现出了一批搞Topic Model的牛人。我主要关注了下面这位大牛和他的学生:David M. BleiLDA的创始者,04年博士毕业。一篇关于Topic Model的博士论文充分体现其精深的数学概率功底;而其自己实现的LDA又可体现其不俗的编程能力。说人无用,有论文为证:J. Chang and D. B
一、作用 jenkins默认一个项目只能对应一个git分支来构建,通过git Parameter插件可以实现选择分支或tag来构建项目。二、安装插件 搜索git Parameter并安装。三、配置项目 General 源码管理 在Branch Specifier (blank for 'any')中输入刚...
之前做的一个iPad上的app,在最新的sdk下重新编译,出现无法旋转的情况。查看了一些文档,由于iOS6在旋转处理的api有了变化,之前处理旋转的函数shouldAutorotateToInterfaceOrientation:在iOS6下无效,由supportedInterfaceOrientations 和 shouldAutorotate替代。于是在对应的viewcontroller上
OGIS规范定义的几何对象定义Curve:A Curve is a 1-dimensional geometric object usually stored as a sequence of Points, with the subtype of Curve specifying the form of the interpolation between Points. This...
1、OA系统中资产管理的定义广义的资产管理定义: 属于经营过程中用来改变或者影响劳动对象的劳动资料,是固定资本的实物形态。这里我们探讨的是组织内部的资产管理课题,关注资产的全生命周期管理,即利用有限的人力、财力、时间等资源完成资产从增加、扩建、调拨至报废的全生命周期管理,使资产在其生命周期内实现价值最大化。在这个过程中,与财务软件中的资产管理注重资产的价值、价格、损耗、折旧管理不同,OA系统...
本文实例讲述了Python多进程分块读取超大文件的方法。分享给大家供大家参考,具体如下:读取超大的文本文件,使用多进程分块读取,将每一块单独输出成文件# -*- coding: GBK -*-import urlparseimport datetimeimport osfrom multiprocessing import Process,Queue,Array,RLock"""多进程分块读取文件...
程序员本来就很辛苦了,但看不懂英语文档更让一个程序员更加头疼。英语对一个程序员来说还是很重要的,英语的学习需要日积月累,不怕大家笑话,我初中时的英语还是杠杠的,记得中考,我英语一题没有错,就作文扣了两分,也许是初中的英语简单吧,也许是高中没有努力学习英语,不知道哪天,英语就突然间陨落了,就在也好不起来了,大学时考四级,还有点底子,一次性考过了,后来的六级就没有这么幸运了,考了5次都没有过,5次啊,...