【boost网络库从青铜到王者】第六篇:asio网络编程中的socket异步读(接收)写(发送)

文章目录

  • 1、简介
  • 2、异步写 void AsyncWriteSomeToSocketErr(const std::string& buffer)
  • 3、异步写void AsyncWriteSomeToSocket(const std::string& buffer)
  • 4、异步写void AsyncSendToSocket(const std::string& buffer)
  • 5、异步读void AsyncReadSomeToSocket(const std::string& buffer)
  • 6、异步读void AsyncReceiveToSocket(const std::string& buffer)
  • 7、总结

1、简介

本文介绍boost asio的异步读写操作及注意事项,为保证知识便于读者吸收,仅介绍api使用的代码片段。下一节再编写完整的客户端和服务器程序。

所以我们定义一个session类,这个session类表示服务器处理客户端连接的管理类

#pragma once
#ifndef __ASYNC_DEMO_H_2023_8_22__
#define __ASYNC_DEMO_H_2023_8_22__
#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"class Session {
public:Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);bool Connect(boost::asio::ip::tcp::endpoint& ep);private:std::shared_ptr<boost::asio::ip::tcp::socket> socket_;std::shared_ptr<Buffer> send_buffer_;};#endif // !__ASYNC_DEMO_H_2023_8_22__

session类定义了一个socket成员变量,负责处理对端**(ip+端口)的连接读写,封装了Connect**函数:

#include"async_demo.h"Session::Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket):socket_(socket)
{send_buffer_ = nullptr;
}bool Session::Connect(boost::asio::ip::tcp::endpoint& ep) {socket_->connect(ep);return true;
}

这里只是简单意思一下,下面核心介绍异步读写api的使用。

2、异步写 void AsyncWriteSomeToSocketErr(const std::string& buffer)

在写操作前,我们先封装一个Buffer结构。用来管理要发送和接收的数据,该结构包含数据域首地址,数据的总长度,以及已经处理的长度(已读的长度或者已写的长度)

写了两个构造函数,两个参数的负责构造写节点,一个参数的负责构造读节点。

#pragma once
#include<iostream>//trv
const int RECVSIZE = 1024;
class Buffer {
public://发送消息协议//param 协议首地址,协议总长度Buffer(const char* msg,int32_t total_len):msg_(new char[total_len]),total_len_(total_len),cur_len_(0){memcpy(msg_, msg, total_len);}//接收消息协议//param 协议总长度,当前接收协议长度Buffer(int32_t total_len):total_len_(total_len),cur_len_(0){msg_ = new char[total_len];}~Buffer() {delete[] msg_;}char* GetMsg() {return msg_;}int32_t GetTotalLen() {return total_len_;}void SetTotalLen(int32_t total_len) {total_len_ = total_len;}int32_t GetCurLen() {return cur_len_;}void SetCurLen(int32_t cur_len) {cur_len_ = cur_len;}
private://消息协议的首地址char* msg_;//消息协议的总长度int32_t total_len_;//消息协议的当前发送长度 +上已经发送长度 = total_len (已经处理的长度(已读的长度或者已写的长度))int32_t cur_len_;
};

接下来为Session添加异步写发送数据操作和负责发送写数据的节点。

#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"class Session {
public:Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);bool Connect(boost::asio::ip::tcp::endpoint& ep);//异步写  这个异步写存在问题void AsyncWriteSomeToSocketErr(const std::string& buffer);void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);
private:std::shared_ptr<boost::asio::ip::tcp::socket> socket_;std::shared_ptr<Buffer> send_buffer_;
};#endif // !__ASYNC_DEMO_H_2023_8_22__

在这里插入图片描述

AsyncWriteSomeToSocketErr函数为我们封装的写操作,AsyncWriteSomeToSocketErr为异步写操作回调的函数,为什么会有三个参数呢,我们可以看一下asio源码:

  template <typename ConstBufferSequence,BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,std::size_t)) WriteTokenBOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>BOOST_ASIO_INITFN_AUTO_RESULT_TYPE_PREFIX(WriteToken,void (boost::system::error_code, std::size_t))async_write_some(const ConstBufferSequence& buffers,BOOST_ASIO_MOVE_ARG(WriteToken) tokenBOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type))BOOST_ASIO_INITFN_AUTO_RESULT_TYPE_SUFFIX((async_initiate<WriteToken,void (boost::system::error_code, std::size_t)>(declval<initiate_async_send>(), token,buffers, socket_base::message_flags(0)))){return async_initiate<WriteToken,void (boost::system::error_code, std::size_t)>(initiate_async_send(this), token,buffers, socket_base::message_flags(0));}

async_write_some是异步写的函数,这个异步写函数有两个参数,第一个参数为ConstBufferSequence常引用类型的buffer,就是构造buffer结构。

第二个参数为WriteToken类型,而WriteToken在上面定义了,是一个函数对象类型,返回值为void,参数为error_codesize_t,所以我们为了调用async_write_some函数也要传入一个符合WriteToken定义的函数,就是我们声明的AsyncWriteSomeToSocketErr函数,前两个参数为WriteToken规定的参数,第三个参数为Buffer的智能指针,这样通过智能指针保证我们发送的Buffer数据生命周期延长。

我们看一下AsyncWriteSomeToSocketErr函数的具体实现:

void Session::AsyncWriteSomeToSocketErr(const std::string& buffer) {//先构造一个发送节点send_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());//然后构造async_write_some的参数buffer和回调和函数socket_->async_write_some(boost::asio::buffer(buffer.c_str(), buffer.length()),//绑定成员函数的地址,类的对象,参数占位符1,参数占位符2std::bind(&Session::AsyncWriteSomeCallBackErr, this, std::placeholders::_1, std::placeholders::_2, send_buffer_));
}//TCP缓冲区 收发端不对等 发11字节 TCP缓冲区只有5字节 那么要分两次发送,假设发送hello world ,第一次只发送hello,\
world未发送,那么如果用户再次调用WriteCallBackErr那么底层不保护发送顺序,那么可能收到的结果hello hello world world \
解决这种就是用一个队列把存储的数据存放到队列里面
void Session::AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer_) {if (err.value() != 0) {std::cout << "error occured!error code: " << err.value() << " . message: " << err.what() << std::endl;return;}if (bytes_transferred + buffer_->GetCurLen() < buffer_->GetTotalLen()) {//buffer_->GetCurLen() = buffer_->GetCurLen() + bytes_transferred;buffer_->SetCurLen(buffer_->GetCurLen() + bytes_transferred);socket_->async_write_some(boost::asio::buffer(buffer_->GetMsg() + buffer_->GetCurLen(), buffer_->GetTotalLen() - buffer_->GetCurLen()),std::bind(&Session::AsyncWriteSomeCallBackErr, this, std::placeholders::_1, std::placeholders::_2, buffer_));}
}

这段代码的作用是实现异步发送数据的功能,主要包括两个函数:AsyncWriteSomeToSocketErrAsyncWriteSomeCallBackErr

  • AsyncWriteSomeToSocketErr 函数的作用是将数据放入发送队列中,并触发异步写操作。具体步骤如下:

    • 首先,使用 std::make_shared 创建一个 Buffer 对象,这个对象用于存储要发送的数据。
    • 然后,使用 socket_->async_write_some 函数触发异步写操作,将数据写入套接字。在这里,你绑定了回调函数 AsyncWriteSomeCallBackErr
  • AsyncWriteSomeCallBackErr 函数是异步写操作完成后的回调函数。它的主要作用是处理写操作的结果,检查是否发生错误,以及是否需要继续发送剩余的数据。具体步骤如下:

    • 首先,检查 err 参数,如果其值不为 0,表示发送出现错误,就输出错误信息并返回。
    • 然后没有错误,检查已传输的字节数 bytes_transferred 加上 buffer_ 对象中已经发送的字节数 buffer_->GetCurLen() 是否小于总的数据长度 buffer_->GetTotalLen()。如果小于总长度,说明还有剩余数据需要发送。
    • 如果有剩余数据需要发送,就更新 buffer_ 对象中的已发送字节数 buffer_->SetCurLen(buffer_->GetCurLen() + bytes_transferred),然后继续触发异步写操作,将剩余的数据发送出去。这里再次调用 socket_->async_write_some 并绑定了相同的回调函数,以便在写操作完成后再次检查和处理。
    • 总体来说,这段代码实现了异步发送数据的逻辑,确保了数据的完整性和发送顺序。通过使用回调函数,可以在每次写操作完成后处理相应的逻辑,包括检查错误、更新已发送字节数以及触发下一次写操作。

AsyncWriteSomeToSocketErr函数里判断如果已经发送的字节数没有达到要发送的总字节数,那么久更新节点已经发送的长度,然后计算剩余要发送的长度,如果有数据未发送完,再次调用async_write_some函数异步发送。

但是这个函数并不能投入实际应用,因为async_write_some回调函数返回已发送的字节数可能并不是全部长度。比如TCP发送缓存区总大小为8字节,但是有3字节未发送(上一次未发送完),这样剩余空间为5字节。
在这里插入图片描述
此时我们调用async_write_some发送hello world!实际发送的长度就是为5,也就是只发送了hello,剩余world!通过我们的回调继续发送。

而实际开发的场景用户是不清楚底层tcp的多路复用调用情况的,用户想发送数据的时候就调用WriteToSocketErr,或者循环调用WriteToSocketErr,很可能在一次没发送完数据还未调用回调函数时再次调用WriteToSocketErr,因为boost::asio封装的时epoll和iocp等多路复用模型。当写事件就绪后就发数据,发送的数据按照async_write_some调用的顺序发送,所以回调函数内调用的async_write_some可能并没有被及时调用。

比如我们如下代码:

//用户发送数据
AsyncWriteSomeToSocketErr("Hello World!");
//用户无感知下层调用情况又一次发送了数据
AsyncWriteSomeToSocketErr("Hello World!");

那么很可能第一次只发送了Hello,后面的数据没发完,第二次发送了Hello World!之后又发送了World!

所以对端收到的数据很可能是HelloHello World! World!

3、异步写void AsyncWriteSomeToSocket(const std::string& buffer)

那怎么解决这个问题呢,我们可以通过队列保证应用层的发送顺序。我们在Session中定义一个发送队列,然后重新定义正确的异步发送函数和回调处理:

#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"class Session {
public:Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);bool Connect(boost::asio::ip::tcp::endpoint& ep);//异步写  这个异步写存在问题void AsyncWriteSomeToSocketErr(const std::string& buffer);void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);void AsyncWriteSomeToSocket(const std::string& buffer);void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);
private:std::shared_ptr<boost::asio::ip::tcp::socket> socket_;std::shared_ptr<Buffer> send_buffer_;std::queue<std::shared_ptr<Buffer>> send_queue_;bool send_padding_;
};#endif // !__ASYNC_DEMO_H_2023_8_22__

定义了bool变量send_padding_,该变量为true表示一个节点还未发送完,false代表发送完成。send_padding_ 用来缓存要发送的消息协议节点,是一个队列。

我们实现异步发送功能:

Session::Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket):socket_(socket),send_padding_(false)
{send_buffer_ = nullptr;if (!send_queue_.empty()) {send_queue_.pop();}
}

函数实现:

void Session::AsyncWriteSomeToSocket(const std::string& buffer) {//发送节点插入队列send_queue_.emplace(new Buffer(buffer.c_str(), buffer.length()));//判断是否还有未发完的数据,false,表示没有,true表示还有if (send_padding_) {return;}//异步发送数据socket_->async_write_some(boost::asio::buffer(buffer.c_str(), buffer.length()), std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));send_padding_ = true;
}void Session::AsyncWriteSomeCallBack(const boost::system::error_code& err, size_t bytes_transferred) {if (err.value() != 0) {std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;return;}//取出队列中队首元素std::shared_ptr<Buffer> send_data = send_queue_.front();send_data->SetCurLen(send_data->GetCurLen() + bytes_transferred);//数据未发送完,继承调用异步函数取出队首元素发送if (send_data->GetCurLen() < send_data->GetTotalLen()) {socket_->async_write_some(boost::asio::buffer(send_data->GetMsg() + send_data->GetCurLen(),send_data->GetTotalLen() - send_data->GetCurLen()),std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));return;}//如果这个数据发送完了,把数据节点取出来send_queue_.pop();//判断队列里面是否还有下一个数据if (send_queue_.empty()) {send_padding_ = false;return;}//有数据则继续发送if (!send_queue_.empty()) {std::shared_ptr<Buffer> send_data_next = send_queue_.front();//异步发送的地址偏移socket_->async_write_some(boost::asio::buffer(send_data_next->GetMsg() + send_data_next->GetCurLen(),send_data_next->GetTotalLen() - send_data_next->GetCurLen()),std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));}
}

这段代码的作用是实现异步发送数据并保证发送顺序的逻辑,主要包括两个函数:AsyncWriteSomeToSocketAsyncWriteSomeCallBack

  • AsyncWriteSomeToSocket 函数的作用是将数据放入发送队列中,并触发异步写操作。具体步骤如下:

    • 首先,将一个新的 Buffer 对象(用于存储要发送的数据)插入到 send_queue_ 队列中。
    • 接着,检查是否还有未发完的数据,如果有,说明还在等待前一次异步写操作完成,直接返回。
    • 如果没有未发完的数据,说明可以触发异步发送操作,使用 socket_->async_write_some 函数将数据写入套接字,并绑定回调函数 AsyncWriteSomeCallBack
  • AsyncWriteSomeCallBack 函数是异步写操作完成后的回调函数。其主要作用是处理写操作的结果,继续发送队列中的下一个数据。具体步骤如下:

    • 首先,检查 err 参数,如果其值不为 0,表示发送出现错误,就输出错误信息并返回。
    • 然后,取出队列中队首元素,该元素是一个 Buffer 对象,表示待发送的数据。
    • 接着,更新这个数据的已发送字节数 send_data->SetCurLen(send_data->GetCurLen() + bytes_transferred)
    • 然后,检查数据是否已经全部发送完,如果未发送完,则继续触发异步写操作,将剩余的数据发送出去。
    • 如果这个数据已经发送完毕,就从队列中移除这个数据节点,并检查队列是否还有下一个数据。
    • 如果队列不为空,表示还有数据需要发送,就取出下一个数据节点,更新已发送字节数,并触发下一个异步写操作,以便发送下一个数据。

这段代码的设计确保了数据的发送顺序,即使在异步发送的情况下也可以保持数据的完整性和顺序。如果发送错误,它也会正确地处理错误情况。

async_write_some函数不能保证每次回调函数触发时发送的长度为要总长度,这样我们每次都要在回调函数判断发送数据是否完成,asio提供了一个更简单的发送函数async_send,这个函数在发送的长度未达到我们要求的长度时就不会触发回调,所以触发回调函数时要么时发送出错了要么是发送完成了,其内部的实现原理就是帮我们不断的调用async_write_some直到完成发送,所以async_send不能和async_write_some混合使用,我们基于async_send封装另外一个发送函数。

4、异步写void AsyncSendToSocket(const std::string& buffer)

函数定义:

#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"class Session {
public:Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);bool Connect(boost::asio::ip::tcp::endpoint& ep);//异步写  这个异步写存在问题void AsyncWriteSomeToSocketErr(const std::string& buffer);void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);void AsyncWriteSomeToSocket(const std::string& buffer);void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);//优先取这个void AsyncSendToSocket(const std::string& buffer);void AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred);
private:std::shared_ptr<boost::asio::ip::tcp::socket> socket_;std::shared_ptr<Buffer> send_buffer_;std::queue<std::shared_ptr<Buffer>> send_queue_;bool send_padding_;
};#endif // !__ASYNC_DEMO_H_2023_8_22__

函数实现:

void Session::AsyncSendToSocket(const std::string& buffer) {//把发送消息协议构造成节点插入队列send_queue_.emplace(new Buffer(buffer.c_str(), buffer.length()));//判断是否还有未发完数据if (send_padding_) {return;}//异步发送数据socket_->async_send(boost::asio::buffer(buffer.c_str(), buffer.length()),std::bind(&Session::AsyncSendCallBack, this, std::placeholders::_1, std::placeholders::_2));send_padding_ = true;
}void Session::AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {if (0 != err.value()) {//发送数据失败std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;return;}//因为调用的是async_send()它的设计目标是简化发送数据的过程,\让用户不必关心数据的细节,只需提供要发送的数据和回调函数即可send_queue_.pop();if (send_queue_.empty()) {send_padding_ = false;return;}if (!send_queue_.empty()) {std::shared_ptr<Buffer> send_data_next = send_queue_.front();//异步发送发生地址偏移socket_->async_send(boost::asio::buffer(send_data_next->GetMsg() + send_data_next->GetCurLen(),send_data_next->GetTotalLen() - send_data_next->GetCurLen()),std::bind(&Session::AsyncSendCallBack, this,std::placeholders::_1, std::placeholders::_2));}
}

这段代码的目的是实现异步发送数据,并在发送完成后调用回调函数进行处理。这与你之前提到的代码逻辑类似,但使用了 async_send 函数代替了 async_write_some,并且没有需要手动维护已发送字节数。

  • 具体的逻辑如下:

    • AsyncSendToSocket 函数用于将数据包装成一个 Buffer 对象并插入发送队列 send_queue_ 中。

    • 接着,检查是否已经有数据正在等待发送(send_padding_ 是否为 true),如果是,则说明还在等待前一次异步发送完成,直接返回。

    • 如果没有等待发送的数据,就调用 socket_->async_send 函数进行异步发送。这个函数会将数据发送到套接字,并在发送完成后调用回调函数 AsyncSendCallBack

    • AsyncSendCallBack 回调函数中,首先检查错误码 err,如果不为 0,表示发送出现错误,输出错误信息并返回。

    • 如果发送成功,就从发送队列中弹出已发送的数据 (send_queue_.pop()),并检查队列是否为空。如果队列为空,说明没有待发送的数据,将 send_padding_ 设置为 false 表示没有数据需要发送。

    • 如果队列不为空,表示还有待发送的数据,就取出队列的头部元素,即下一个要发送的数据,然后调用 socket_->async_send 再次异步发送数据。这个过程会重复,直到队列中的数据全部发送完毕。

总体而言,这段代码实现了异步发送数据的功能,保证了发送的顺序,同时也能正确处理发送过程中的错误。不同之处在于,它使用了 async_send 函数,该函数封装了发送的细节,使得发送数据更加方便。

5、异步读void AsyncReadSomeToSocket(const std::string& buffer)

接下来介绍异步读操作,异步读操作和异步的写操作类似同样又async_read_someasync_receive函数,前者触发的回调函数获取的读数据的长度可能会小于要求读取的总长度,后者触发的回调函数读取的数据长度等于读取的总长度。

先基于async_read_some封装一个读取的函数AsyncReadSomeToSocket,同样在Session类的声明中添加一些变量:

函数定义:

#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"class Session {
public:Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);bool Connect(boost::asio::ip::tcp::endpoint& ep);//异步写  这个异步写存在问题void AsyncWriteSomeToSocketErr(const std::string& buffer);void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);void AsyncWriteSomeToSocket(const std::string& buffer);void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);//优先取这个void AsyncSendToSocket(const std::string& buffer);void AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred);//异步读 优先取这个void AsyncReadSomeToSocket(const std::string& buffer);void AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred);private:std::shared_ptr<boost::asio::ip::tcp::socket> socket_;std::shared_ptr<Buffer> send_buffer_;std::queue<std::shared_ptr<Buffer>> send_queue_;std::shared_ptr<Buffer> recv_buffer_;bool send_padding_;bool recv_padding_;
};#endif // !__ASYNC_DEMO_H_2023_8_22__

函数实现:

void Session::AsyncReadSomeToSocket(const std::string& buffer) {//判断是否正在读数据,这里第一次读数据if (recv_padding_) {return;}recv_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());//异步读取数据socket_->async_read_some(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),std::bind(&Session::AsyncReadSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));recv_padding_ = true;
}void Session::AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {if (0 != err.value()) {std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;return;}//判断读取的字节数,没有读取完继续读取recv_buffer_->SetCurLen(recv_buffer_->GetCurLen() + bytes_transferred);if (recv_buffer_->GetCurLen() < recv_buffer_->GetTotalLen()) {socket_->async_read_some(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),std::bind(&Session::AsyncReadSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));return;}//将数据投递到队列里交给逻辑线程处理,此处略去//如果读完了则将标记置为falserecv_padding_ = false;recv_buffer_ = nullptr;
}

这段代码的主要功能是异步读取数据,并在读取完成后调用回调函数 AsyncReadSomeCallBack 处理数据。以下是代码逻辑的详细解释:

  • AsyncReadSomeToSocket 函数用于异步读取数据。在这个函数中,首先检查 recv_padding_ 是否为 true。如果为 true,表示正在读取数据,直接返回,避免重复读取。

  • 如果 recv_padding_false,说明可以开始读取数据。这时,创建一个 Buffer 对象 recv_buffer_,并初始化为要读取的数据。

  • 接着,调用 socket_->async_read_some 函数进行异步读取数据。这个函数会在读取完成后调用回调函数 AsyncReadSomeCallBack

  • AsyncReadSomeCallBack 回调函数中,首先检查错误码 err,如果不为 0,表示读取出现错误,输出错误信息并返回。

  • 如果读取成功,将已读取的字节数添加到 recv_buffer_ 的当前长度 CurLen 中。然后,检查是否已经读取完所有数据,即 CurLen 是否小于 TotalLen

  • 如果未读取完,继续调用 socket_->async_read_some 函数继续异步读取剩余的数据,直到读取完所有数据。

  • 如果读取完了,将 recv_padding_ 置为 false,表示没有正在读取的数据。最后,清空 recv_buffer_ 对象,以便下次读取新的数据。

这段代码实现了异步读取数据的逻辑,确保数据被正确读取并处理。如果数据没有完全读取,它会继续异步读取剩余的部分,直到读取完整个数据。如果有新的数据需要读取,可以再次调用 AsyncReadSomeToSocket

6、异步读void AsyncReceiveToSocket(const std::string& buffer)

我们基于async_receive再封装一个接收数据的函数:

函数声明:

#pragma once
#ifndef __ASYNC_DEMO_H_2023_8_22__
#define __ASYNC_DEMO_H_2023_8_22__
#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"class Session {
public:Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);bool Connect(boost::asio::ip::tcp::endpoint& ep);//异步写  这个异步写存在问题void AsyncWriteSomeToSocketErr(const std::string& buffer);void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);void AsyncWriteSomeToSocket(const std::string& buffer);void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);//优先取这个void AsyncSendToSocket(const std::string& buffer);void AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred);//异步读 优先取这个void AsyncReadSomeToSocket(const std::string& buffer);void AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred);void AsyncReceiveToSocket(const std::string& buffer);void AsyncReceiveCallBack(boost::system::error_code& err, std::size_t bytes_transferred);private:std::shared_ptr<boost::asio::ip::tcp::socket> socket_;std::shared_ptr<Buffer> send_buffer_;std::queue<std::shared_ptr<Buffer>> send_queue_;std::shared_ptr<Buffer> recv_buffer_;bool send_padding_;bool recv_padding_;
};#endif // !__ASYNC_DEMO_H_2023_8_22__

函数实现:

void Session::AsyncReceiveToSocket(const std::string& buffer) {//判断是否有数据正在读取if (recv_padding_) {return;}recv_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());socket_->async_receive(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),std::bind(&Session::AsyncReceiveCallBack, this, std::placeholders::_1, std::placeholders::_2));recv_padding_ = true;
}void Session::AsyncReceiveCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {if (0 != err.value()) {std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;return;}recv_buffer_->SetCurLen(recv_buffer_->GetCurLen() + bytes_transferred);recv_padding_ = false;recv_buffer_ = nullptr;
}

这段代码看起来非常类似于前面提到的异步读取数据的代码。它实现了异步接收数据的逻辑,以下是代码的详细解释:

  • AsyncReceiveToSocket 函数用于异步接收数据。首先,它检查 recv_padding_ 是否为 true,如果为 true,表示已经有数据在读取,直接返回,以避免重复接收。

  • 如果 recv_padding_false,说明可以开始接收数据。这时,创建一个 Buffer 对象 recv_buffer_,并初始化为要接收的数据。

接着,调用 socket_->async_receive 函数进行异步接收数据。这个函数会在接收完成后调用回调函数 AsyncReceiveCallBack

AsyncReceiveCallBack 回调函数中,首先检查错误码 err,如果不为 0,表示接收出现错误,输出错误信息并返回。

如果接收成功,将已接收的字节数添加到 recv_buffer_ 的当前长度 CurLen 中。然后,将 recv_padding_ 置为 false,表示没有正在接收的数据。

最后,清空 recv_buffer_ 对象,以便下次接收新的数据。

这段代码实现了异步接收数据的逻辑,确保数据被正确接收并处理。如果数据没有完全接收,它会继续异步接收剩余的部分,直到接收完整个数据。如果有新的数据需要接收,可以再次调用 AsyncReceiveToSocket

同样async_read_someasync_receive不能混合使用,否则会出现逻辑问题。

7、总结

总体代码声明:

#pragma once
#ifndef __ASYNC_DEMO_H_2023_8_22__
#define __ASYNC_DEMO_H_2023_8_22__
#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"class Session {
public:Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);bool Connect(boost::asio::ip::tcp::endpoint& ep);//异步写  这个异步写存在问题void AsyncWriteSomeToSocketErr(const std::string& buffer);void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);void AsyncWriteSomeToSocket(const std::string& buffer);void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);//优先取这个void AsyncSendToSocket(const std::string& buffer);void AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred);//异步读 优先取这个void AsyncReadSomeToSocket(const std::string& buffer);void AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred);void AsyncReceiveToSocket(const std::string& buffer);void AsyncReceiveCallBack(boost::system::error_code& err, std::size_t bytes_transferred);private:std::shared_ptr<boost::asio::ip::tcp::socket> socket_;std::shared_ptr<Buffer> send_buffer_;std::queue<std::shared_ptr<Buffer>> send_queue_;std::shared_ptr<Buffer> recv_buffer_;bool send_padding_;bool recv_padding_;
};#endif // !__ASYNC_DEMO_H_2023_8_22__

总体代码定义:

#include"async_demo.h"Session::Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket):socket_(socket),send_padding_(false),recv_padding_(false)
{send_buffer_ = nullptr;recv_buffer_ = nullptr;if (!send_queue_.empty()) {send_queue_.pop();}
}bool Session::Connect(boost::asio::ip::tcp::endpoint& ep) {socket_->connect(ep);return true;
}void Session::AsyncWriteSomeToSocketErr(const std::string& buffer) {//先构造一个发送节点send_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());//然后构造async_write_some的参数buffer和回调和函数socket_->async_write_some(boost::asio::buffer(buffer.c_str(), buffer.length()),//绑定成员函数的地址,类的对象,参数占位符1,参数占位符2std::bind(&Session::AsyncWriteSomeCallBackErr, this, std::placeholders::_1, std::placeholders::_2, send_buffer_));
}//TCP缓冲区 收发端不对等 发11字节 TCP缓冲区只有5字节 那么要分两次发送,假设发送hello world ,第一次只发送hello,\
world未发送,那么如果用户再次调用WriteCallBackErr那么底层不保护发送顺序,那么可能收到的结果hello hello world world \
解决这种就是用一个队列把存储的数据存放到队列里面
void Session::AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer_) {if (err.value() != 0) {std::cout << "error occured!error code: " << err.value() << " . message: " << err.what() << std::endl;return;}if (bytes_transferred + buffer_->GetCurLen() < buffer_->GetTotalLen()) {//buffer_->GetCurLen() = buffer_->GetCurLen() + bytes_transferred;buffer_->SetCurLen(buffer_->GetCurLen() + bytes_transferred);socket_->async_write_some(boost::asio::buffer(buffer_->GetMsg() + buffer_->GetCurLen(), buffer_->GetTotalLen() - buffer_->GetCurLen()),std::bind(&Session::AsyncWriteSomeCallBackErr, this, std::placeholders::_1, std::placeholders::_2, buffer_));}
}void Session::AsyncWriteSomeToSocket(const std::string& buffer) {//发送节点插入队列send_queue_.emplace(new Buffer(buffer.c_str(), buffer.length()));//判断是否还有未发完的数据,false,表示没有,true表示还有if (send_padding_) {return;}//异步发送数据socket_->async_write_some(boost::asio::buffer(buffer.c_str(), buffer.length()), std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));send_padding_ = true;
}void Session::AsyncWriteSomeCallBack(const boost::system::error_code& err, size_t bytes_transferred) {if (err.value() != 0) {std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;return;}//取出队列中队首元素std::shared_ptr<Buffer> send_data = send_queue_.front();send_data->SetCurLen(send_data->GetCurLen() + bytes_transferred);//数据未发送完,继承调用异步函数取出队首元素发送if (send_data->GetCurLen() < send_data->GetTotalLen()) {socket_->async_write_some(boost::asio::buffer(send_data->GetMsg() + send_data->GetCurLen(),send_data->GetTotalLen() - send_data->GetCurLen()),std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));return;}//如果这个数据发送完了,把数据节点取出来send_queue_.pop();//判断队列里面是否还有下一个数据if (send_queue_.empty()) {send_padding_ = false;return;}//有数据则继续发送if (!send_queue_.empty()) {std::shared_ptr<Buffer> send_data_next = send_queue_.front();//异步发送的地址偏移socket_->async_write_some(boost::asio::buffer(send_data_next->GetMsg() + send_data_next->GetCurLen(),send_data_next->GetTotalLen() - send_data_next->GetCurLen()),std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));}
}void Session::AsyncSendToSocket(const std::string& buffer) {//把发送消息协议构造成节点插入队列send_queue_.emplace(new Buffer(buffer.c_str(), buffer.length()));//判断是否还有未发完数据if (send_padding_) {return;}//异步发送数据socket_->async_send(boost::asio::buffer(buffer.c_str(), buffer.length()),std::bind(&Session::AsyncSendCallBack, this, std::placeholders::_1, std::placeholders::_2));send_padding_ = true;
}void Session::AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {if (0 != err.value()) {//发送数据失败std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;return;}//因为调用的是async_send()它的设计目标是简化发送数据的过程,\让用户不必关心数据的细节,只需提供要发送的数据和回调函数即可send_queue_.pop();if (send_queue_.empty()) {send_padding_ = false;return;}if (!send_queue_.empty()) {std::shared_ptr<Buffer> send_data_next = send_queue_.front();//异步发送发生地址偏移socket_->async_send(boost::asio::buffer(send_data_next->GetMsg() + send_data_next->GetCurLen(),send_data_next->GetTotalLen() - send_data_next->GetCurLen()),std::bind(&Session::AsyncSendCallBack, this,std::placeholders::_1, std::placeholders::_2));}
}void Session::AsyncReadSomeToSocket(const std::string& buffer) {//判断是否正在读数据,这里第一次读数据if (recv_padding_) {return;}recv_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());//异步读取数据socket_->async_read_some(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),std::bind(&Session::AsyncReadSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));recv_padding_ = true;
}void Session::AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {if (0 != err.value()) {std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;return;}//判断读取的字节数,没有读取完继续读取recv_buffer_->SetCurLen(recv_buffer_->GetCurLen() + bytes_transferred);if (recv_buffer_->GetCurLen() < recv_buffer_->GetTotalLen()) {socket_->async_read_some(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),std::bind(&Session::AsyncReadSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));return;}//将数据投递到队列里交给逻辑线程处理,此处略去//如果读完了则将标记置为falserecv_padding_ = false;recv_buffer_ = nullptr;
}void Session::AsyncReceiveToSocket(const std::string& buffer) {//判断是否有数据正在读取if (recv_padding_) {return;}recv_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());socket_->async_receive(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),std::bind(&Session::AsyncReceiveCallBack, this, std::placeholders::_1, std::placeholders::_2));recv_padding_ = true;
}void Session::AsyncReceiveCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {if (0 != err.value()) {std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;return;}recv_buffer_->SetCurLen(recv_buffer_->GetCurLen() + bytes_transferred);recv_padding_ = false;recv_buffer_ = nullptr;
}

本文介绍了boost asio异步读写的操作,仅仅是代码片段和api的封装便于大家理解,下一篇利用这些异步api写一个异步的服务器展示收发效果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/104021.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Grafana Dashboard 备份方案

文章目录 Grafana Dashboard 备份方案引言工具简介支持的组件要求配置备份安装使用 pypi 安装grafana备份工具配置环境变量使用Grafana Backup Tool 进行备份恢复备份 Grafana Dashboard恢复 Grafana Dashboard结论Grafana Dashboard 备份方案 引言 每个使用 Grafana 的同学都…

Anomalib:异常检测的深度学习库 -- 应用Anomalib训练自己的图片

文章目录 资料汇总 Github链接&#xff1a;https://github.com/openvinotoolkit/anomalib/blob/main/README.md 论文链接&#xff1a;https://arxiv.org/pdf/2202.08341v1.pdf 其他参考资料&#xff1a;https://paperswithcode.com/paper/efficientad-accurate-visual-anomaly-…

使用css实现点击切换active效果

不使用js&#xff0c;纯css实现点击切换active样式 一个父盒子中嵌套小标签,横向排列 html <div class"box"><a href"#">选项1</a><a href"#">选项2</a><a href"#">选项3</a><a href&…

lvs实现DR模型搭建

目录 一&#xff0c;实现DR模型搭建 1&#xff0c; 负载调度器配置 1.1调整ARP参数 1.2 配置虚拟IP地址重启网卡 1.3 安装ipvsadm 1.4 加载ip_vs模块 1.5 启动ipvsadm服务 1.6 配置负载分配策略 1.7 保存策略 2&#xff0c; web节点配置 1.1 调整ARP参数 1.2 配置虚拟I…

多线程与高并发——并发编程(1)

文章目录 并发编程一、线程的基本概念1 基础概念1.1 进程和线程1.2 多线程1.3 串行、并行、并发1.4 同步异步、阻塞非阻塞 2 线程的创建2.1 继承Thread类&#xff0c;重写run方法2.2 实现Runnable接口&#xff0c;实现run方法2.3 实现Callable接口&#xff0c;实现call方法&…

STL之vector(讲解迭代器失效,拷贝构造函数等现代版写法)

还是老规矩&#xff0c;直接上代码&#xff1a; #pragma once #include "riterator.hpp" #include <iostream> #include <assert.h> #include <set> #include <map> using namespace std; namespace cc {template<class T>class vect…

Node基础--Node简介以及安装教程

1.Node简介 Node.js发布于2009年5月&#xff0c;由Ryan Dahl开发&#xff0c;是一个基于Chrome V8引擎的JavaScript运行环境&#xff0c;使用了一个事件驱动、非阻塞式I/O模型&#xff0c;让JavaScript 运行在服务端的开发平台&#xff0c;它让JavaScript成为与PHP、Python、Pe…

静态代码扫描工具 Sonar 配置及使用

概览 Sonar 是一个用于代码质量管理的开放平台。通过插件机制&#xff0c;Sonar 可以集成不同的测试工具&#xff0c;代码分析工具&#xff0c;以及持续集成工具。与持续集成工具&#xff08;例如 Hudson/Jenkins 等&#xff09;不同&#xff0c;Sonar 并不是简单地把不同的代…

B树和B+树MySQL为什么用B+树?

文章目录 B树和B树B树B树的定义B树的插入操作删除操作 B树B树的定义B树的插入操作删除操作 B树和B树的区别?MySQL数据库为啥用B树作为索引&#xff0c;而不用B树? B树和B树 原文链接&#xff1a;https://blog.csdn.net/jinking01/article/details/115130286 B树 B树的定义…

变色树脂T-46CC详解

一、产品介绍 Tulsimer T-46 CC 是一款专门研制的、优质的、强酸型的聚苯乙烯架构的阳离子交换树脂&#xff0c;具有核子级磺酸官能团&#xff0c; 并同时拥有绝佳的物理及化学稳定品质&#xff0c;因此广泛应用于水处理当中。 Tulsimer T-46 CC 主要用于水净化处理&#xff…

智慧党建VR虚拟3D数字化展厅发展和传承传统文化

三维全景虚拟现实技术应用在虚拟展馆中&#xff0c;主要是通过全景照片的虚拟与建模&#xff0c;营造出三维虚拟仿真的场景&#xff0c;从而结合展馆展示的需求&#xff0c;营造出更加有效的氛围&#xff0c;起到优化展示效果的作用。 三维全景虚拟现实技术的应用&#xff0c;能…

Java【手撕双指针】LeetCode 611. “有效三角形个数“, 图文详解思路分析 + 代码

文章目录 前言一、有效三角形个数1, 题目2, 思路分析1, 从左往右 or 从右往左?3, 代码展示 前言 各位读者好, 我是小陈, 这是我的个人主页, 希望我的专栏能够帮助到你: &#x1f4d5; JavaSE基础: 基础语法, 类和对象, 封装继承多态, 接口, 综合小练习图书管理系统等 &#x1…

BootstrapBlazor组件使用:数据注解

文章目录 前言BB数据注解数据注解源码数据注解简介注解简单实例[BB 编辑弹窗](https://www.blazor.zone/edit-dialog)[ValidateForm 表单组件](https://www.blazor.zone/validate-form)使用简介 前言 BootstrapBlazor(一下简称BB)是个特别好用的组件&#xff0c;基本上满足了大…

jupyter notebook出现ERR_SSL_VERSION_OR_CIPHER_MISMATCH解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

软件测试架构师在实际工作中都做哪些事情呢?

软件测试架构师在实际工作中&#xff0c;都做哪些事情呢&#xff1f; 一家业务体系庞大、复杂的公司的测试架构师的职责主要有五个&#xff1a; 1、测试团队的技术带头人 测试架构师会关注整个团队的技术提升&#xff0c;包括技术难题的攻关&#xff0c;团队遇到的技术难题&…

(6)(6.3) 自动任务中的相机控制

文章目录 前言 6.3.1 概述 6.3.2 自动任务类型 6.3.3 创建合成图像 前言 本文介绍 ArduPilot 的相机和云台命令&#xff0c;并说明如何在 Mission Planner 中使用这些命令来定义相机勘测任务。这些说明假定已经连接并配置了相机触发器和云台(camera trigger and gimbal ha…

ORB-SLAM2报错集合(数据集测试系列1)

目录 错误1 错误2 错误3 错误4 错误5 错误6 错误7 错误8 TUM-RGBD测试 KITTI测试 EuRoC测试 写在前面~ ORB-SLAM2 github链接&#xff1a;GitHub - electech6/ORB_SLAM2_detailed_comments: Detailed comments for ORB-SLAM2 with trouble-shooting, key formula …

QChart:数据可视化(用图像形式显示数据内容)

1、数据可视化的图形有&#xff1a;柱状/线状/条形/面积/饼/点图、仪表盘、走势图&#xff0c;弦图、金字塔、预测曲线图、关系图、数学公式图、行政地图、GIS地图等。 2、在QT Creator的主页面&#xff0c;点击 欢迎》示例》右侧输入框 输入Chart&#xff0c;即可查看到QChar…

回归预测 | MATLAB实现SSA-RF麻雀搜索优化算法优化随机森林算法多输入单输出回归预测(多指标,多图)

回归预测 | MATLAB实现SSA-RF麻雀搜索优化算法优化随机森林算法多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现SSA-RF麻雀搜索优化算法优化随机森林算法多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09;…

数据结构—排序

8.排序 8.1排序的概念 什么是排序&#xff1f; 排序&#xff1a;将一组杂乱无章的数据按一定规律顺序排列起来。即&#xff0c;将无序序列排成一个有序序列&#xff08;由小到大或由大到小&#xff09;的运算。 如果参加排序的数据结点包含多个数据域&#xff0c;那么排序往…