flink写入hbase-程序员宅基地

技术标签: flink  hbase  

参考:  https://www.cnblogs.com/swordfall/p/10527423.html

flink 流处理写入数据到hbase. 采用的是批量写入(500条数据写入一次)。

 

HBaseWriter.java

package com.flink;

import com.flink.model.DeviceData;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 *
 * 写入HBase
 * 继承RichSinkFunction重写父类方法
 *
 * 写入hbase时500条flush一次, 批量插入, 使用的是writeBufferSize
 */
class HBaseWriter extends RichSinkFunction<DeviceData>{
    private static final Logger logger = LoggerFactory.getLogger(HBaseWriter.class);

    private static org.apache.hadoop.conf.Configuration configuration;
    private static Connection connection = null;
    private static BufferedMutator mutator;
    private static int count = 0;

    @Override
    public void open(Configuration parameters) throws Exception {
        configuration = HBaseConfiguration.create();
        configuration.set("hbase.master", "192.168.3.101:60020");
        configuration.set("hbase.zookeeper.quorum", "192.168.3.101");
        configuration.set("hbase.zookeeper.property.clientPort", "2181");
        try {
            connection = ConnectionFactory.createConnection(configuration);
        } catch (IOException e) {
            e.printStackTrace();
        }
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("t1"));
        params.writeBufferSize(2 * 1024 * 1024);
        mutator = connection.getBufferedMutator(params);
    }

    @Override
    public void close() throws IOException {
        if (mutator != null) {
            mutator.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

    @Override
    public void invoke(DeviceData values, Context context) throws Exception {
        //Date 1970-01-06 11:45:55  to 445555000
        long unixTimestamp= 0;
        try {
            String gatherTime = values.GatherTime;
            //毫秒和秒分开处理
            if (gatherTime.length() > 20) {
                long ms = Long.parseLong(gatherTime.substring(20, 23));
                Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(gatherTime);
                unixTimestamp = date.getTime() + ms;
            } else {
                Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(gatherTime);
                unixTimestamp = date.getTime();
            }
        } catch (ParseException e) {
            e.printStackTrace();
        }
        String RowKey = values.MachID + String.valueOf(unixTimestamp);
        String Key = values.OperationValue;
        String Value = values.OperationData;
        System.out.println("Column Family=f1,  RowKey=" + RowKey + ", Key=" + Key + " ,Value=" + Value);
        Put put = new Put(RowKey.getBytes());
        put.addColumn("f1".getBytes(), Key.getBytes(), Value.getBytes());
        mutator.mutate(put);
        //每满500条刷新一下数据
        if (count >= 500){
            mutator.flush();
            count = 0;
        }
        count = count + 1;
    }
}

Main.java

//写入hbase
dataStream.addSink(new HBaseWriter());
DeviceData.java
package com.flink.model;

/**
 * 设备数据的数据结构
 */
class DeviceData {
    String compID;
    String machID;
    String Type;
    String gateMac;
    String operationValue;
    String operationData;
    String gatherTime;
}

 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u012447842/article/details/90203512

智能推荐

邻接矩阵-建立图-程序员宅基地

文章浏览阅读358次。1.介绍图的相关概念  图是由顶点的有穷非空集和一个描述顶点之间关系-边(或者弧)的集合组成。通常,图中的数据元素被称为顶点,顶点间的关系用边表示,图通常用字母G表示,图的顶点通常用字母V表示,所以图可以定义为:  G=(V,E)其中,V(G)是图中顶点的有穷非空集合,E(G)是V(G)中顶点的边的有穷集合1.1 无向图:图中任意两个顶点构成的边是没有方向的1.2 有向图:图中..._给定一个邻接矩阵未必能够造出一个图

IAR调试程序闪退问题_iar在debug时停止工作-程序员宅基地

文章浏览阅读1.7k次。问题:IAR调试STM32程序,点击调试按钮后软件自动关闭,并弹出报错提示框解决:将调试的接口模式改为SWD模式即可。我的原先设置为JTAG模式。_iar在debug时停止工作

100M宽带是多少网速_100m的宽带网速是多少兆-程序员宅基地

文章浏览阅读742次。100兆宽带的网速通常指的是每秒可以传输的数据量为100兆比特(Mb)。在此情况下,1兆比特(Mb)等于100万比特(Mbps),而1字节(B)等于8比特(bps)。因此,100兆宽带的网速可以计算如下:100兆比特/秒=100/8 兆字节/秒= 12.5兆字节/秒所以,100兆宽带的网速约为12.5MBps(兆字节/秒),也可以说为100Mbps(兆比特/秒)。但是需要注意的是,实际的下载和上传速度可能受到各种因素的影响,如网络拥堵、设备性能等。因此,实际使用中您可能会感受到较低的速度。_100m的宽带网速是多少兆

Windows 7 通用 CDC 串口驱动程序_cdcserial驱动 win7-程序员宅基地

文章浏览阅读2.4w次,点赞13次,收藏44次。Windows 7 通用 CDC 串口驱动程序Windows 7 自带 CDC 串口类设备的驱动程序文件 usbser.sys,所缺的是驱动配置文件 usbser.inf 文件,将 Windows 10 的 usbser.inf 文件拷贝到 Windows 7,注释掉 SourceDisksNames 和 SourceDisksFiles 部分就可以作为 Windows 7 的 CDC 串口类..._cdcserial驱动 win7

AI遮天传 NLP-词表示_nlp中词语的表示-程序员宅基地

文章浏览阅读2.5k次,点赞53次,收藏51次。NLP-词表示_nlp中词语的表示

sed 替换多个空格为一个-程序员宅基地

文章浏览阅读2.4k次。sed -i 's/[ ][ ]*/ /g' file.txt _sed 多个空格替换为1个

随便推点

什么是算法?-程序员宅基地

文章浏览阅读1.7w次,点赞15次,收藏129次。算法(algorithm)是解决一系列问题的清晰指令,也就是,能对一定规范的输入,在有限的时间内获得所要求的输出。 简单来说,算法就是解决一个问题的具体方法和步骤。算法是程序的灵 魂。二、算法的特征1.可行性 算法中执行的任何计算步骤都可以分解为基本可执行的操作步,即每个计算步都可以在有限时间里完成(也称之为有效性) 算法的每一步都要有确切的意义,不能有二义性。例如“增加x的值”,并没有说增加多少,计算机就无法执行明确的运算。 _算法

【网络安全】网络安全的标准和规范_网络安全标准规范-程序员宅基地

文章浏览阅读1.5k次,点赞18次,收藏26次。网络安全的标准和规范是网络安全领域的重要组成部分。它们为网络安全提供了技术依据,规定了网络安全的技术要求和操作方式,帮助我们构建安全的网络环境。下面,我们将详细介绍一些主要的网络安全标准和规范,以及它们在实际操作中的应用。_网络安全标准规范

Windows上的巧克力味Chocolatey详解_chocolate怎么卸载-程序员宅基地

文章浏览阅读1.5k次。Chocolatey是什么?很简单,Chocolatey就是Windows系统的yum或apt-get。一、Chocolatey介绍Chocolatey是一款专为Windows系统开发的、基于NuGet的包管理器工具,类似于Node.js的npm,MacOS的brew,Ubuntu的apt-get,它简称为choco。Chocolatey的设计目标是成为一个去中心化的框架,便于开发_chocolate怎么卸载

关于Python的三个谎言,别再盲目学Python了_关于python 盲目-程序员宅基地

文章浏览阅读2.3w次,点赞177次,收藏741次。Python作为21世纪最火的编程语言,市面上各种学习视频层出不穷,关于Python的学习氛围也逐渐浓厚,Python固然简单好上手,但事实上Python也不是那么容易学习的。如果不采取正确的学习方式,很容易走入误区。关于Python的三个谎言,你一定要清楚。1: 学完Python,并不能立马拿一两万的工资,甚至可能找不到工作!2:Python也没有那么简单,不是有手就行!3:别想着1个星期、2个星期就能学会,你至少得腾出一两个月来连续学习!如果你还是执意要学Python,那么好,接下来我们看看怎._关于python 盲目

js 实现将json数据导出到excel表格-程序员宅基地

文章浏览阅读2.1k次。方法一将table标签,包括tr、td等对json数据进行拼接,将table输出到表格上实现,这种方法的弊端在于输出的是伪excel,虽说生成xls为后缀的文件,但文件形式上还是html,代码如下<html><head> <p style="font-size: 20px;color: red;">使用table标签方式将json导出xls文件</p..._如何把js数据转换成表格

IEEE协会会员权益,注册IEEE会员有必要了解下_ieee会员好处-程序员宅基地

IEEE协会是一个专注于航空与电子系统领域的组织,注册IEEE会员可以享受许多权益,包括免费访问协会资源中心和参加各种会议及活动。