发布订阅服务端
概念
**发布订阅模式(Publish-Subscribe Pattern)**是一种消息传递模式,其中发布者发布消息,而订阅者接收和处理这些消息。它是一种松耦合的通信方式,允许发布者和订阅者在不知道彼此存在的情况下进行通信。
发布订阅模式的原理基于消息队列或主题,发布者将消息发布到特定的消息队列或主题中,而订阅者可以订阅这些消息队列或主题以接收和处理消息。发布者和订阅者之间的通信是异步的,这意味着发布者发布消息后,订阅者可以在任何时候接收和处理消息。
发布订阅模式的核心思想是将发布者和订阅者解耦,使得它们可以独立地运行和扩展。这种解耦有助于提高系统的灵活性和可伸缩性,因为发布者和订阅者可以根据需要进行扩展和修改,而不会影响彼此的操作。
发布订阅模式具有以下优势:
- 解耦:发布者和订阅者是松耦合的,它们可以独立地运行和扩展,而不会相互影响。
- 灵活性:发布者可以随时发布消息,而订阅者可以随时订阅和取消订阅消息,这使得系统更加灵活。
- 可伸缩性:发布者和订阅者可以根据需要进行扩展和修改,而不会影响彼此的操作。
- 异步通信:发布者和订阅者之间的通信是异步的,这意味着发布者发布消息后,订阅者可以在任何时候接收和处理消息。
- 消息过滤:订阅者可以根据自己的需求订阅特定的消息,从而实现消息过滤。
- 可靠性:发布订阅模式通常使用消息队列或主题来存储消息,这可以确保消息不会丢失,并且可以在订阅者不可用时进行存储。
- 分布式系统:发布订阅模式可以在分布式系统中使用,从而实现跨节点的通信。
VSOA 服务端的数据发布分为“普通发布方式”和“快速发布方式”。当服务端需要大量高频的发布数据时,可以使用快速发布的方式。快速发布进行大量高频的数据发布无法保证所有数据都能有效的到达,因此客户端在接收时可能会存在数据的丢失,使用快速发布的条件是这些丢失的数据并不影响程序的实际效果。若数据的变化是离散的而非连续的,使用快速发布可能会存在弊端。
在 VSOA 的底层实现中,普通发布方式基于 TCP,快速发布方式基于 UDP,当有大批量快周期更新数据的传输需求时,应当使用快速发布方式,否则 TCP 的丢包会因恢复时间过长导致实时性下降。
源码
这里设计另一个数据周期性发布的立场,数据是一个模拟的陀螺仪。当有客户端订阅该服务时,服务线程每秒发布一次数据。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_platform.h"
#include "vsoa_server.h"#define MY_SERVER_ADDR "0.0.0.0"
#define MY_SERVER_PORT (4002)
#define MY_SERVER_NAME "{\"name\":\"axis_server\"}"
#define MY_SERVER_PASSWD "123456"#define AXIS_SER_BUF_LEN 100static void *publish_axis_thread (void *arg)
{vsoa_url_t url;vsoa_payload_t payload;int roll = 1, pitch = 1, yaw = 1;vsoa_server_t *server = arg;char param[AXIS_SER_BUF_LEN + 1];url.url = "/axis";url.url_len = strlen(url.url);payload.data = NULL;payload.data_len = 0;payload.param = param;roll = 1;pitch = 1;yaw = 1;while (TRUE) {sleep(1);if (!vsoa_server_is_subscribed(server, &url)) {continue;}payload.param_len = snprintf(param, AXIS_SER_BUF_LEN,"{\"roll\": %d, \"pitch\": %d, \"yaw\": %d}",roll++, pitch++, yaw++);vsoa_server_publish(server, &url, &payload);}return (NULL);
}int main (int argc, char **argv)
{vsoa_server_t *server;/** 创建服务端*/server = vsoa_server_create(MY_SERVER_NAME);if (!server) {fprintf(stderr, "Can not create VSOA server!\n");return (-1);}/** 设置密码,设置为NULL,表示密码为空,客户端可以不输入密码*/vsoa_server_passwd(server, MY_SERVER_PASSWD);/** 启动微服务*/struct sockaddr_in addr;bzero(&addr, sizeof(struct sockaddr_in));addr.sin_family = AF_INET;addr.sin_port = htons(MY_SERVER_PORT);addr.sin_addr.s_addr = inet_addr(MY_SERVER_ADDR);addr.sin_len = sizeof(struct sockaddr_in);if (!vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {vsoa_server_close(server);fprintf(stderr, "Can not start VSOA server!\n");return (-1);}/** Create publish thread*/pthread_t pub_threadid;pthread_create(&pub_threadid, NULL, publish_axis_thread, server);/** 进入监听事件循环*/while (1) {int cnt;int max_fd;fd_set fds;struct timespec timeout = {1, 0 };FD_ZERO(&fds);max_fd = vsoa_server_fds(server, &fds);cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);if (cnt > 0) {vsoa_server_input_fds(server, &fds);}}return (0);
}
如果要使用快速发布,只需要使用vsoa_server_quick_publish 函数替换vsoa_server_publish 函数即可。
执行
执行上面的发布服务程序,并用vMessenger 来模拟客户端来订阅服务。可以发现在没有订阅时,服务端是不对外发布的。
服务端执行情况如下:
客户端执行情况如下: