ZMTP协议

ZoreMQ Transport Protocol是一个传输层协议,用于ZMQ的连接的信息交互,本文档描述的是3.0协议,主要分析基于NULL Security Mechanism


协议语法

ZMTP由三部分组成,分别是 greeting、handshake、traffic

部分描述构成
greeting描述ZMQ版本、安全机制等signature + version + mechanism + as-server + filler
handshake描述端类型,如 PUB/SUB,REQ/REP一个或多个command
traffic命令或者消息command

ZMTP Wireshark 抓包

WireShark 默认不提供ZMTP解析插件,需要自己配置,步骤如下:

插件仓库:https://github.com/whitequark/zmtp-wireshark

下载插件:https://github.com/whitequark/zmtp-wireshark/blob/master/zmtp-dissector.lua

将插件zmtp-dissector.lua放到WireShark安装目录,比如我的是:C:\Program Files\Wireshark

修改C:\Program Files\Wireshark\init.lua,在文件末尾添加

dofile(DATA_DIR.."zmtp-dissector.lua")

对基于TCP端口通讯ZMQ进行抓包,例如端口为7380,将该端口Decode As ZMTP

8607ab1ed5f523a6bbc4491ec74b924f.png
解析接如下

1b1998d082deac0fe4c96b7aafb00f94.png

greeting

greeting 固定64个字节大小,下面将依次介绍每个部分。

signature

固定10字节大小,固定值为ff 00 00 00 00 00 00 00 01 7f;

signature可以用来校验链接是否为ZMQ链接,连续读取10个字节,判断开头是否为0xff,结尾是否为0x7f

version

固定2字节大小,格式为{major_version, minor_version}3.0 协议则为03 00,实际编码过程中只会校验major_version;

mechanism

固定20字节大小,这里只介绍NULL Security Mechanism,也就是不校验,其值为NULL,剩余以内容填充0;

as-server

固定一个字节大小,0x00 或者 0x01 ,当mechanism为NULL时候,as-server必须为0

filler

填充greeting至64个字节。

抓包示意

由Wireshark解析过后的协议。

6bd633daef76a532eb2eed806a8e6485.png

Frame

greeting之后的所有数据格式都为Frame,包含commandmessage

frame的格式如下:

Frame = Flag + Payload Length + Payload

抓包示意如下
在这里插入图片描述

  • Flag
    Flag 为1字节大小,每位代表不同的意思,参考抓包解释
    在这里插入图片描述
    低1位:表示是否有更多Frame,这里用于ZMQ中sendmore属性
    低2位:表示长度是否为8字节长度,否则为1字节长度
    低3位:表示当前frame是否为Command
    其他:保留,为0

  • Payload Length
    数据长度,可以为1字节或者8字节大小,根据Flag中的标志位决定

  • Payload
    实际的数据,大小为Payload Length

handshake

此阶段用来交换对端的READY命令以及metadata,主要包含对端的类型。handshake本质是Command,为Frame的一种。
NULL Security Mechanism机制中,以PUB/SUB模式为例,handshake的数据如下:
在这里插入图片描述
Payload内容如下:

[1 byte] Command size + [n bytes]Command Name + [1 bytes]Metadata Key size + [n bytes]Metadata Key + [4 bytes]Metadata Value size + [n bytes]Metadata Value

使用Socket实现ZMQ SUB方法

代码如下:

//
//  ZMTP 3.0 debugging subscriber
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
#include <WinSock2.h>
#include <ws2tcpip.h>#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <assert.h>
#include <fcntl.h>
#include <cstdint>
#include <iostream>#pragma comment(lib, "ws2_32.lib")typedef struct
{uint8_t flags;     //  Must be zerouint8_t size;      //  Size, 0 to 255 byteuint8_t data[255]; //  Message data
} zmtp_msg_t;static void derp(char *s)
{perror(s);exit(1);
}static void tcp_send(int handle, void *buffer, size_t len)
{if (send(handle, (char *) buffer, len, 0) == -1)derp((char *) "send");
}static void tcp_recv(int handle, void *buffer, size_t len)
{printf(" - reading %d bytes: ", (int) len);fflush(stdout);size_t len_recd = 0;while (len_recd < len){size_t bytes = recv(handle, (char *) buffer + len_recd, len - len_recd, 0);if (bytes == 0)break; //  Peer has shutdownprintf(" [%d]", (int) bytes);fflush(stdout);if (bytes == -1)derp((char *) "recv");len_recd += bytes;}printf("\n");fflush(stdout);
}static void zmtp_recv(int handle, zmtp_msg_t *msg)
{tcp_recv(handle, (uint8_t *) msg, 2);tcp_recv(handle, msg->data, msg->size);
}static void zmtp_send(int handle, zmtp_msg_t *msg)
{tcp_send(handle, (uint8_t *) msg, msg->size + 2);
}//  This is the 3.0 greeting (64 bytes)
typedef struct
{uint8_t signature[10];uint8_t version[2];uint8_t mechanism[20];uint8_t as_server[1];uint8_t filler[31];
} zmtp_greeting_t;int main(void)
{puts("I: starting subscriber");WSADATA wsData;if (WSAStartup(MAKEWORD(2, 2), &wsData) != 0){std::cerr << "无法初始化Winsock" << std::endl;return 1;}//  Create TCP socketint peer;if ((peer = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)derp((char *) "socket");const char *serverIP   = "127.0.0.1";const int   serverPort = 5559;sockaddr_in serverAddress {};serverAddress.sin_family = AF_INET;serverAddress.sin_port   = htons(serverPort);if (inet_pton(AF_INET, serverIP, &(serverAddress.sin_addr)) <= 0){std::cerr << "无效的服务器IP地址" << std::endl;closesocket(peer);WSACleanup();return 1;}//  Keep trying to connect until we succeedputs("I: waiting for connection");while (connect(peer, reinterpret_cast<sockaddr *>(&serverAddress), sizeof(serverAddress)) == -1)Sleep(1);puts("I: connected OK");//  This is our greeting (64 octets)zmtp_greeting_t outgoing = {{0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F},{3, 0},{'N', 'U', 'L', 'L', 0},{0},{0}};//  Do full backwards version detection following RFC23//  Send first ten bytes of greeting to peertcp_send(peer, &outgoing, 10);//  Read first byte from peerzmtp_greeting_t incoming;tcp_recv(peer, &incoming, 1);uint8_t length = incoming.signature[0];if (length != 0xFF){puts("E: signature not valid (1)");closesocket(peer);exit(0);}//  Looks like 2.0+, read 9 more bytes to be suretcp_recv(peer, (uint8_t *) &incoming + 1, 9);if ((incoming.signature[9] & 1) != 1){puts("E: signature not valid (2)");closesocket(peer);exit(0);}//  Exchange major version numbersputs("I: signature valid, exchanging major versions");tcp_send(peer, (uint8_t *) &outgoing + 10, 1);tcp_recv(peer, (uint8_t *) &incoming + 10, 1);if (incoming.version[0] >= 3){//  If version >= 3, the peer is using ZMTP 3.0, so send//  rest of the greeting and continue with ZMTP 3.0.puts("I: peer is talking ZMTP 3.0");puts("I: sending rest of greeting...");tcp_send(peer, (uint8_t *) &outgoing + 11, 53);//  Get remainder of greeting from peerputs("I: waiting for greeting from peer...");tcp_recv(peer, (uint8_t *) &incoming + 11, 53);//  Do NULL handshake - send READY command//  For now, empty dictionaryputs("I: have full greeting from peer");zmtp_msg_t  ready = {0x04, 0x19};std::string data;data.push_back(0x05);data.append("READY");data.push_back(0x0b);data.append("Socket-Type");int         netByteOrderSize = htonl(3);const char *valueBytes       = reinterpret_cast<const char *>(&netByteOrderSize);data.append(valueBytes, sizeof(netByteOrderSize));data.append("SUB");memcpy(ready.data, data.c_str(), data.size());puts("I: sending READY");zmtp_send(peer, &ready);//  Now wait for peer's READY commandputs("I: expecting READY from peer");zmtp_recv(peer, &ready);//assert(memcmp(ready.data, "READY   ", 8) == 0);puts("I: OK! NULL security handshake completed");puts("I: send sub command");zmtp_msg_t subCmd {0x00, 0x01};subCmd.data[0] = 0x01;zmtp_send(peer, &subCmd);}else{puts("E: major version not valid");closesocket(peer);exit(0);}puts("I: READY, printing messages");while (true){zmtp_msg_t msg;zmtp_recv(peer, &msg);msg.data[msg.size] = 0;puts((char *) msg.data);}closesocket(peer);return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/117538.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

移动基站ip的工作原理

原理介绍 Basic Principle 先说一下概念&#xff0c;大家在不使用 WIFI 网络的时候&#xff0c;使用手机通过运营商提供的网络进行上网的时候&#xff0c;目前都是在用户端使用私有IP&#xff0c;然后对外做 NAT 转换&#xff0c;这样的情况就导致大家统一使用一些 IP 段进行访…

Unity中Shader的帧缓存区Clear(color+Z+stencil)

文章目录 前言一、什么是帧缓冲区二、片段运算三、随机扫描显示器&#xff08;可以按照自定义路径绘制帧&#xff09;四、光栅扫描显示器&#xff08;从左到右&#xff0c;从上到下&#xff0c;依次绘制&#xff09;五、缓冲的方式&#xff1a;单缓冲 和 双缓冲1、单缓冲2、双缓…

VBA中如何将if写到一行

在VBA中&#xff0c;可以使用以下两种方式来编写一行if语句&#xff1a; 使用三元运算符&#xff1a; Dim result As String result "Yes" If True Else "No"在这个例子中&#xff0c;如果条件为真&#xff0c;则result变量的值为"Yes"&#…

pycharm 打开Terminal时报错activate.ps1,因为在此系统上禁止运行脚本,并因此无法进入虚拟环境

pycharm 打开Terminal时报错activate.ps1&#xff0c;因为在此系统上禁止运行脚本&#xff0c;并因此无法进入虚拟环境 如下图所示&#xff1a; 网上说可以set_restrictFalse什么的&#xff0c;虽然也可但可能会降低电脑安全性&#xff0c;可以将下面的终端改为cmd.exe即可

SpringCloudAlibaba Gateway(三)-整合Sentinel功能路由维度、API维度进行流控

Gateway整合Sentinel ​ 前面使用过Sentinel组件对服务提供者、服务消费者进行流控、限流等操作。除此之外&#xff0c;Sentinel还支持对Gateway、Zuul等主流网关进行限流。 ​ 自sentinel1.6.0版开始&#xff0c;Sentinel提供了Gateway的适配模块&#xff0c;能针对路由(rou…

Linux centos7 bash编程——-求质数和

训练项目&#xff1a;使用函数求质数和。 定义一个函数IsPrime()&#xff0c;据此判断一个数是否为质数 由用户输入一个整数&#xff0c;求出比此数大的两个最小质数之和。 一、解决思路: 1.先在键盘上输入一个整数 2.求出比此数大的最小质数 3.再求出比此质数大的另一个…

2.3 Vector 动态数组(迭代器)

C数据结构与算法 目录 本文前驱课程 1 C自学精简教程 目录(必读) 2 Vector<T> 动态数组&#xff08;模板语法&#xff09; 本文目标 1 熟悉迭代器设计模式&#xff1b; 2 实现数组的迭代器&#xff1b; 3 基于迭代器的容器遍历&#xff1b; 迭代器语法介绍 对迭…

全民健康生活方式行动日,天猫健康联合三诺生物推出“15天持续测糖计划”

糖尿病是全球高发慢性病中患病人数增长最快的疾病&#xff0c;是导致心血管疾病、失明、肾衰竭以及截肢等重大疾病的主要病因之一。目前中国有近1.4亿成人糖尿病患者&#xff0c;科学的血糖监测和健康管理对于糖尿病患者来说至关重要。 在9月1日全民健康生活方式行动日前夕&am…

如何排查 Flink Checkpoint 失败问题?

分析&回答 这是 Flink 相关工作中最常出现的问题&#xff0c;值得大家搞明白。 1. 先找到超时的subtask序号 图有点问题&#xff0c;因为都是成功没失败的&#xff0c;尴尬了。 借图&#xff1a; 2. 找到对应的机器和任务 方法很多&#xff0c;这里看自己习惯和公司提供…

【网络编程上】

目录 一.什么是互联网 1.计算机网络的定义与分类&#xff08;了解&#xff09; &#xff08;1&#xff09;计算机网络的定义 &#xff08;2&#xff09;计算机网络的分类 ① 按照网络的作用范围进行分类 ②按照网络的使用者进行分类 2.网络的网络 &#xff08;理解&#xf…

探索内网穿透工具:实现局域网SQL Server数据库的公网远程访问方法

文章目录 1.前言2.本地安装和设置SQL Server2.1 SQL Server下载2.2 SQL Server本地连接测试2.3 Cpolar内网穿透的下载和安装2.3 Cpolar内网穿透的注册 3.本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4.公网访问测试5.结语 1.前言 数据库的重要性相信大家都有所了解&…

【UE 材质】常用向量运算节点——点积、叉积、归一化

目录 一、点积 二、叉积 三、归一化 一、点积 点积&#xff0c;也称为内积或数量积&#xff0c;是一种用于计算两个向量之间关系的操作。对于两个三维向量 A&#xff08;a1,a2,a3&#xff09;和 B(b1,b2,b3)&#xff0c;它们的点积可以用以下公式表示&#xff1a; ABa1​⋅…

【设计模式】Head First 设计模式——构建器模式 C++实现

设计模式最大的作用就是在变化和稳定中间寻找隔离点&#xff0c;然后分离它们&#xff0c;从而管理变化。将变化像小兔子一样关到笼子里&#xff0c;让它在笼子里随便跳&#xff0c;而不至于跳出来把你整个房间给污染掉。 设计思想 ​ 将一个复杂对象的构建与其表示相分离&…

实验室的服务器和本地pycharm怎么做图传

提前说一个 自认为 比较重要的一点&#xff1a; 容器中安装opencv&#xff0c;可以先试试用 apt install libopencv-dev python3-opencv 我感觉在图传的时候用的不是 opencv-python 而是ubuntu的opencv库 所以用 apt install 安装试一下 参考 远程调试 qt.qpa.xcb: coul…

大数据Flink(七十一):SQL的时间属性

文章目录 SQL的时间属性 一、Flink三种时间属性简介

windows11 利用vmware17 安装rocky9操作系统

下载相关软件和镜像 vmware17 下载 下载页面 Download VMware Workstation Pro ​ rocky8镜像下载 官网链接&#xff1a;Rocky Linux 下载页面 Download Rocky | Rocky Linux 点击Minimal下载 安装rocky9 选择镜像文件&#xff0c;点击下一步 点击下一步 启动虚拟机 选…

程序开发:构建功能强大的应用的艺术

程序开发是在今天的数字化时代中扮演重要角色的一项技术。通过编写代码&#xff0c;开发人员能创造出无数不同的应用&#xff0c;从简单的计算器到复杂的社交平台。电子商务应用、在线教育平台、医疗记录系统等&#xff0c;都重视程序开发的重要性&#xff0c;通过这其中的交互…

xml和json互转工具类

分享一个json与xml互转的工具类&#xff0c;非常好用 一、maven依赖 <!-->json 和 xm 互转</!--><dependency><groupId>org.dom4j</groupId><artifactId>dom4j</artifactId><version>2.1.3</version></dependency&g…

深度学习推荐系统(二)Deep Crossing及其在Criteo数据集上的应用

深度学习推荐系统(二)Deep Crossing及其在Criteo数据集上的应用 在2016年&#xff0c; 随着微软的Deep Crossing&#xff0c; 谷歌的Wide&Deep以及FNN、PNN等一大批优秀的深度学习模型被提出&#xff0c; 推荐系统全面进入了深度学习时代&#xff0c; 时至今日&#xff0c…

说说Flink on yarn的启动流程

分析&回答 核心流程 FlinkYarnSessionCli 启动的过程中首先会检查Yarn上有没有足够的资源去启动所需要的container&#xff0c;如果有&#xff0c;则上传一些flink的jar和配置文件到HDFS&#xff0c;这里主要是启动AM进程和TaskManager进程的相关依赖jar包和配置文件。接着…