zmq pub/sub使用详解_zmq pub sub-程序员宅基地

技术标签: pub/sub  并发发布  详解  zmq  

关于zmq的基本简介,请参考ZeroMQ基础入门

pub/sub模式介绍

发布/订阅模式,全称为Publish/Subscribe,支持多个发布者/多订阅者,使用在消息单向传输的应用场景,消息总是从发布者发送到订阅者。

一般的使用流程为:

pub/sub

pub端:

  • 创建context
  • 创建socket,设置ZMQ_PUB模式
  • bind端口
  • 循环发布消息send

socket特性:

特性
兼容的对端套接字 ZMQ_SUB
方向性 单向
发送/接收模式 仅发送
进入路由策略
流出路由策略 扇出(呈扇形发出)
ZMQ_HWM 选项行为 丢弃

sub端:

  • 创建context
  • 创建socket,设置ZMQ_SUB模式
  • connect到pub端
  • setsockopt设置ZMQ_SUBSCRIBE订阅的消息
  • 循环接收recv
特性
兼容的对端套接字 ZMQ_PUB
方向性 单向
发送/接收模式 仅接收
进入路由策略 平等排队
流出路由策略
ZMQ_HWM 选项行为 丢弃
注意事项

  • pub端socket不能使用recv函数,同样,sub端不能使用send函数
  • 当pub端由于到达了高水位而使ZMQ_PUB套接字进入静默模式的时候,所有发送到这个有问题的订阅者的消息都会被丢弃,直到静默模式终止
  • pub端socket的zmq_send()函数永远不会阻塞
  • sub端刚创建socket后是无法订阅到任何消息的,必须使用setsockopt设置订阅的消息后才能接收到
  • sub端是根据参数前缀进行过滤的。订阅的内容以pub端发出的内容从头开始匹配为过滤条件,完全匹配订阅内容的消息会被sub接收,如订阅内容为"a"时,所有以a开头的消息都会接收
  • 当订阅内容为"",长度为0时为订阅所有内容,因为所有消息都匹配成功
  • 如果存在某个pub没有被任何sub连接,则该pub会丢弃所有的消息
  • 如果采用tcp的连接方式,sub很慢时,消息将会堆积在pub,可以通过设置ZMQ_HWM选项来保护发布者,对于重要的消息,也可以写入硬盘等待发送
  • pub和sub谁bind谁connect并无严格要求(虽本质并无区别),但仍建议pub使用bind,sub使用connect,在实际测试中,使用sub绑定而pub connect时,sub端无法接收到消息
  • 一个显著的问题是,“slow joiner”可能导致发布者的第一笔消息总是丢失,下文会进一步说明该问题
  • 一个订阅者(subcriber)可以链接超过一个发布者(publisher)。数据到达后将交叉存取(公平队列),以保证发布者之间的数据不会淹没
  • 从ZeroMQ v3.x版本开始,使用tcp://或者ipc://协议连接时会在发布者进行消息过滤,使用epgm://协议仍在订阅者过滤;在ZeroMQ v2.x,所有消息过滤都发生在订阅者
关于slow joiner问题

一般描述为,即使是先启动订阅者,再启动发布者,订阅者也有可能会丢失前一部分数据。你无法得知SUB是何时开始接收消息的。

这是因为订阅者向发布者建立连接也是耗费时间的,虽然时间极短,但不为0。这个时间内发布者发布的内容将没有订阅者能够接收。

几种处理方式:

  • 不处理

对于可以容忍数据丢失的应用来说,不必理会丢失的数据。比如接收天气信息的应用,你可能只关注最新的天气信息。

  • 使用延时发布策略

因为数据丢失的原因是发布者在没有稳定接收者的情况下仍然发送了数据,所以可以让发布者等待一段时间再发送数据。

最简单的方式是sleep一段时间。

缺点是:

  1. 不知道sleep多久合适
  2. 即使sleep也不能保证sub者一定就准备好了
  3. 在程序中加sleep不是一种优雅的设计
  • 通知发布者模式

发布者可以在确保订阅者已经启动成功的情况下再发送数据,只需要订阅者在准备好后通知发布者。

这个不难实现,订阅者在准备好后,首先使用req/rep模式向发布者发送一个特定的请求,发布者接收请求并应答,然后再发布真正的数据。

这种方式增加了简单一步操作,但保证了数据的完整。

多线程发送问题

注意:context是线程安全的,但socket非线程安全。在多个线程中使用同一个socket会导致程序崩溃(不提倡使用锁,它会导致竞争并减慢性能,不符合zmq的设计理念)。

  1. 使用proxy

一种比较常见的场景是,发布者使用多个线程来发布不同的数据,所有的数据通过一个endpoint发送。

这与动态发现问题类似,对于一个应用场景,可能会随时增加发布者或者订阅者,构建一个合适的系统可以减小编码和出错的机会。

在这里插入图片描述

如图所示,使用一个proxy可以轻松解决这个问题。

  • 增加了中间层。中间层是静态的,它不会经常变动
  • pub可以随意增减,它们都connect到xsub
  • sub也可以随意增减,它们都connect到xpub
  • xsub收到的所有数据会通过proxy转发到xpub,从而发布到各sub

对于这种情况,可以在多个发布线程中分别创建socket,connect到xsub,从而避免多线程共用socket。

这种情况下,当发布线程较多时,会导致socket堆积,最终导致系统文件描述符过多而失败。可以使用线程池,每个线程使用自己的socket。

还有一种使用代理的情况是:

在这里插入图片描述

这里proxy类似于一个桥,连接了两个不同的网络。也可以作为协议网关,用于连接两个使用不同协议的网络。

结合天气服务,实现一个Proxy的例程如下:

//
//  Weather proxy device C++
//
// Olivier Chamoux <[email protected]>
//

#include "zhelpers.hpp"

int main (int argc, char *argv[])
{
    
    zmq::context_t context(1);

    //  This is where the weather server sits
    zmq::socket_t frontend(context, ZMQ_SUB);
    frontend.connect("tcp://192.168.55.210:5556");

    //  This is our public endpoint for subscribers
    zmq::socket_t backend (context, ZMQ_PUB);
    backend.bind("tcp://10.1.1.0:8100");

    //  Subscribe on everything
    frontend.setsockopt(ZMQ_SUBSCRIBE, "", 0);

    //  Shunt messages out to our own subscribers
    while (1) {
    
        while (1) {
    
            zmq::message_t message;
            int more;
            size_t more_size = sizeof (more);

            //  Process all parts of the message
            frontend.recv(&message);
            frontend.getsockopt( ZMQ_RCVMORE, &more, &more_size);
            backend.send(message, more? ZMQ_SNDMORE: 0);
            if (!more)
                break;      //  Last message part
        }
    }
    return 0;
}
  1. 使用push/pull

使用push/pull建立一个发布端模型,pull接收到所有数据通过proxy转到pub,再发送出去。

这种方式是使用xpub/xsub的变体。因为实际使用中,在单进程内使用sub绑定,pub连接的方式无法接收到数据,而push/pull是一个可行的替代。

启动代理:

// 代码片断
void StartPubProxy(string& port)
{
    
    try
    {
    
        // 面向client的socket,多线程发来所有数据
        zmq::socket_t frontend(m_ctx, ZMQ_PULL);
        frontend.bind("inproc://#0");

        // 面向services的socket,提供对外端口,并实际发布数据
        zmq::socket_t backend(m_ctx, ZMQ_PUB);
        string strBind = "tcp://*:" + port;
        backend.bind(strBind);

        // 创建proxy
        zmq::proxy(static_cast<void*>(frontend), static_cast<void*>(backend), nullptr);
    }
    catch(const std::exception& e)
    {
    
        std::cerr <<"zmq启动转发代理失败:" << e.what() << '\n';
    }
}

原发布线程向"inproc://#0"push数据即可。

  1. 使用boost asio单线程处理数据

在有些情况下,加锁的多线程未必比无锁单线程更快。

对于多线程发布模型来说,可以把它们要发送的数据通过strand.post到io_service的单线程队列里,由工作线程异步处理。

这种方式简单方便,关于使用asio创建线程的使用可以参考:boost::asio::io_service创建线程池简单实例

保护发布者

前方讲述,当sub端处理较慢时,pub端在到达高水位线后会丢弃数据。对于重要的应用,由于不能对sub端的性能作出任何假设,所以需要一定的策略来保证。

ZMQ_HWM

ZMQ_SNDHWM:对向外发送的消息设置高水位(最大缓存量),ZMQ_RCVHWM:对进入socket的消息设置高水位。

ZMQ_SNDHWM属性将会设置socket参数指定的socket对外发送的消息的高水位。高水位是一个硬限制,它会限制每一个与此socket相连的在内存中排队的未处理的消息数目的最大值。

0值代表着没有限制,默认值为1000,就在bind/connect之前设置该属性。如果设置为无限,可能会导致发布者崩溃。

如果已经到达了规定的限制,socket就需要进入一种异常的状态,表现形式因socket类型而异。socket会进行适当的调节,比如阻塞或者丢弃已发送的消息。

总是给套接字设置一个基于期望的订阅方数量的最大值,你打算用于队列的内存的数量,和一个消息平均大小的高水位线。例如,如果你希望有5000个订阅方,有1G的内存可有,消息平均200字节,那么一个安全的高水位线应该是(1000000000/200/5000)=1000.


参考资料

?MQ - The Guide
Unable to receive messages when binding subscriber socket
What is a simple example of a working XSUB / XPUB proxy in zeromq
How to implement Pub-Sub Network with a Proxy by using XPUB and XSUB in ZeroMQ(C++)?
Proxy with inproc frontend
ZMQ模式详解——发布/订阅模式
ZeroMQ基础入门

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/guotianqing/article/details/101429319

智能推荐

while循环&CPU占用率高问题深入分析与解决方案_main函数使用while(1)循环cpu占用99-程序员宅基地

文章浏览阅读3.8k次,点赞9次,收藏28次。直接上一个工作中碰到的问题,另外一个系统开启多线程调用我这边的接口,然后我这边会开启多线程批量查询第三方接口并且返回给调用方。使用的是两三年前别人遗留下来的方法,放到线上后发现确实是可以正常取到结果,但是一旦调用,CPU占用就直接100%(部署环境是win server服务器)。因此查看了下相关的老代码并使用JProfiler查看发现是在某个while循环的时候有问题。具体项目代码就不贴了,类似于下面这段代码。​​​​​​while(flag) {//your code;}这里的flag._main函数使用while(1)循环cpu占用99

【无标题】jetbrains idea shift f6不生效_idea shift +f6快捷键不生效-程序员宅基地

文章浏览阅读347次。idea shift f6 快捷键无效_idea shift +f6快捷键不生效

node.js学习笔记之Node中的核心模块_node模块中有很多核心模块,以下不属于核心模块,使用时需下载的是-程序员宅基地

文章浏览阅读135次。Ecmacript 中没有DOM 和 BOM核心模块Node为JavaScript提供了很多服务器级别,这些API绝大多数都被包装到了一个具名和核心模块中了,例如文件操作的 fs 核心模块 ,http服务构建的http 模块 path 路径操作模块 os 操作系统信息模块// 用来获取机器信息的var os = require('os')// 用来操作路径的var path = require('path')// 获取当前机器的 CPU 信息console.log(os.cpus._node模块中有很多核心模块,以下不属于核心模块,使用时需下载的是

数学建模【SPSS 下载-安装、方差分析与回归分析的SPSS实现(软件概述、方差分析、回归分析)】_化工数学模型数据回归软件-程序员宅基地

文章浏览阅读10w+次,点赞435次,收藏3.4k次。SPSS 22 下载安装过程7.6 方差分析与回归分析的SPSS实现7.6.1 SPSS软件概述1 SPSS版本与安装2 SPSS界面3 SPSS特点4 SPSS数据7.6.2 SPSS与方差分析1 单因素方差分析2 双因素方差分析7.6.3 SPSS与回归分析SPSS回归分析过程牙膏价格问题的回归分析_化工数学模型数据回归软件

利用hutool实现邮件发送功能_hutool发送邮件-程序员宅基地

文章浏览阅读7.5k次。如何利用hutool工具包实现邮件发送功能呢?1、首先引入hutool依赖<dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.19</version></dependency>2、编写邮件发送工具类package com.pc.c..._hutool发送邮件

docker安装elasticsearch,elasticsearch-head,kibana,ik分词器_docker安装kibana连接elasticsearch并且elasticsearch有密码-程序员宅基地

文章浏览阅读867次,点赞2次,收藏2次。docker安装elasticsearch,elasticsearch-head,kibana,ik分词器安装方式基本有两种,一种是pull的方式,一种是Dockerfile的方式,由于pull的方式pull下来后还需配置许多东西且不便于复用,个人比较喜欢使用Dockerfile的方式所有docker支持的镜像基本都在https://hub.docker.com/docker的官网上能找到合..._docker安装kibana连接elasticsearch并且elasticsearch有密码

随便推点

Python 攻克移动开发失败!_beeware-程序员宅基地

文章浏览阅读1.3w次,点赞57次,收藏92次。整理 | 郑丽媛出品 | CSDN(ID:CSDNnews)近年来,随着机器学习的兴起,有一门编程语言逐渐变得火热——Python。得益于其针对机器学习提供了大量开源框架和第三方模块,内置..._beeware

Swift4.0_Timer 的基本使用_swift timer 暂停-程序员宅基地

文章浏览阅读7.9k次。//// ViewController.swift// Day_10_Timer//// Created by dongqiangfei on 2018/10/15.// Copyright 2018年 飞飞. All rights reserved.//import UIKitclass ViewController: UIViewController { ..._swift timer 暂停

元素三大等待-程序员宅基地

文章浏览阅读986次,点赞2次,收藏2次。1.硬性等待让当前线程暂停执行,应用场景:代码执行速度太快了,但是UI元素没有立马加载出来,造成两者不同步,这时候就可以让代码等待一下,再去执行找元素的动作线程休眠,强制等待 Thread.sleep(long mills)package com.example.demo;import org.junit.jupiter.api.Test;import org.openqa.selenium.By;import org.openqa.selenium.firefox.Firefox.._元素三大等待

Java软件工程师职位分析_java岗位分析-程序员宅基地

文章浏览阅读3k次,点赞4次,收藏14次。Java软件工程师职位分析_java岗位分析

Java:Unreachable code的解决方法_java unreachable code-程序员宅基地

文章浏览阅读2k次。Java:Unreachable code的解决方法_java unreachable code

标签data-*自定义属性值和根据data属性值查找对应标签_如何根据data-*属性获取对应的标签对象-程序员宅基地

文章浏览阅读1w次。1、html中设置标签data-*的值 标题 11111 222222、点击获取当前标签的data-url的值$('dd').on('click', function() { var urlVal = $(this).data('ur_如何根据data-*属性获取对应的标签对象

推荐文章

热门文章

相关标签