1 socket封装
适用于windows linux, ipv4 ,ipv6,后面有适用于windows的更简单的封装,看适用,随取随用
#pragma once
#ifdef _WIN32
#include <winsock2.h>
#include <Windows.h>#include <mswsock.h>
#include <inaddr.h>
#include <ws2ipdef.h>
#include <WS2tcpip.h>
#else
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <sys/uio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#endif#include <vector>
#include <string>
#include <atomic>
#include <memory>
#include <mutex>
#include <map>#ifdef _WIN32
typedef SOCKET socket_t;
#else
typedef int socket_t;
#endif#define rtp_error_t int#ifdef _WIN32
//typedef unsigned int socklen_t;
#endif#if defined(UVGRTP_HAVE_SENDMSG) && !defined(UVGRTP_HAVE_SENDMMSG)
struct mmsghdr {struct msghdr msg_hdr;unsigned int msg_len;
};
static inline
int sendmmsg(int sockfd, struct mmsghdr* msgvec, unsigned int vlen,int flags)
{ssize_t n = 0;for (unsigned int i = 0; i < vlen; i++) {ssize_t ret = sendmsg(sockfd, &msgvec[i].msg_hdr, flags);if (ret < 0)break;n += ret;}if (n == 0)return -1;return int(n);
}
#endifconst int MAX_BUFFER_COUNT = 256;/* Vector of buffers that contain a full RTP frame */
typedef std::vector<std::pair<size_t, uint8_t*>> buf_vec;/* Vector of RTP frames constructed from buf_vec entries */
typedef std::vector<std::vector<std::pair<size_t, uint8_t*>>> pkt_vec;typedef rtp_error_t(*packet_handler_vec)(void*, buf_vec&);struct socket_packet_handler {void* arg = nullptr;packet_handler_vec handler = nullptr;
};class socket {
public:socket(int rce_flags);~socket();/* Create socket using "family", "type" and "protocol"*** Return RTP_OK on success* return RTP_SOCKET_ERROR if creating the socket failed */rtp_error_t init(short family, int type, int protocol);/* Same as bind(2), assigns an address for the underlying socket object** Return RTP_OK on success* Return RTP_BIND_ERROR if the bind failed */rtp_error_t bind(short family, unsigned host, short port);rtp_error_t bind(sockaddr_in& local_address);rtp_error_t bind_ip6(sockaddr_in6& local_address);/* Check if the given address is IPv4 or IPv6** Return 1 for IPv4* Return 2 for IPv6* return -1 for error */int check_family(std::string addr);/* Same as setsockopt(2), used to manipulate the underlying socket object** Return RTP_OK on success* Return RTP_GENERIC_ERROR if setsockopt failed */rtp_error_t setsockopt(int level, int optname, const void* optval, socklen_t optlen);/* Same as send(2), send message to the given address with send_flags* It is required to give at least one type of address: sockaddr_in or sockaddr_in6. It is possible* to give both and the function wil pick the correct one to use** It is possible to combine multiple buffers and send them as one RTP frame by calling* the sendto() with a vector containing the buffers and their lengths** Write the amount of bytes sent to "bytes_sent" if it's not NULL** Return RTP_OK on success and write the amount of bytes sent to "bytes_sent"* Return RTP_SEND_ERROR on error and set "bytes_sent" to -1 */rtp_error_t sendto(sockaddr_in& addr, sockaddr_in6& addr6, uint8_t* buf, size_t buf_len, int send_flags);rtp_error_t sendto(sockaddr_in& addr, sockaddr_in6& addr6, uint8_t* buf, size_t buf_len, int send_flags, int* bytes_sent);rtp_error_t sendto(uint32_t ssrc, sockaddr_in& addr, sockaddr_in6& addr6, buf_vec& buffers, int send_flags);rtp_error_t sendto(uint32_t ssrc, sockaddr_in& addr, sockaddr_in6& addr6, buf_vec& buffers, int send_flags, int* bytes_sent);rtp_error_t sendto(uint32_t ssrc, sockaddr_in& addr, sockaddr_in6& addr6, pkt_vec& buffers, int send_flags);rtp_error_t sendto(uint32_t ssrc, sockaddr_in& addr, sockaddr_in6& addr6, pkt_vec& buffers, int send_flags, int* bytes_sent);/* Same as recv(2), receives a message from socket (remote address not known)** Write the amount of bytes read to "bytes_read" if it's not NULL** Return RTP_OK on success and write the amount of bytes received to "bytes_read"* Return RTP_INTERRUPTED if the call was interrupted due to timeout and set "bytes_sent" to 0* Return RTP_GENERIC_ERROR on error and set "bytes_sent" to -1 */rtp_error_t recv(uint8_t* buf, size_t buf_len, int recv_flags);rtp_error_t recv(uint8_t* buf, size_t buf_len, int recv_flags, int* bytes_read);/* Same as recvfrom(2), receives a message from remote** Write the sender address to "sender" if it's not NULL* Write the amount of bytes read to "bytes_read" if it's not NULL** Return RTP_OK on success and write the amount of bytes sent to "bytes_sent"* Return RTP_INTERRUPTED if the call was interrupted due to timeout and set "bytes_sent" to 0* Return RTP_GENERIC_ERROR on error and set "bytes_sent" to -1 */rtp_error_t recvfrom(uint8_t* buf, size_t buf_len, int recv_flags, sockaddr_in* sender,sockaddr_in6* sender6, int* bytes_read);rtp_error_t recvfrom(uint8_t* buf, size_t buf_len, int recv_flags, sockaddr_in* sender);rtp_error_t recvfrom(uint8_t* buf, size_t buf_len, int recv_flags, int* bytes_read);rtp_error_t recvfrom(uint8_t* buf, size_t buf_len, int recv_flags);/* Create sockaddr_in (IPv4) object using the provided information* NOTE: "family" must be AF_INET */static sockaddr_in create_sockaddr(short family, unsigned host, short port);/* Create sockaddr_in object using the provided information* NOTE: "family" must be AF_INET */static sockaddr_in create_sockaddr(short family, std::string host, short port);/* Create sockaddr_in6 (IPv6) object using the provided information */static sockaddr_in6 create_ip6_sockaddr(unsigned host, short port);static sockaddr_in6 create_ip6_sockaddr(std::string host, short port);static sockaddr_in6 create_ip6_sockaddr_any(short src_port);std::string get_socket_path_string() const;static std::string sockaddr_to_string(const sockaddr_in& addr);static std::string sockaddr_ip6_to_string(const sockaddr_in6& addr6);/* Get reference to the actual socket object */socket_t& get_raw_socket();/* Install a packet handler for vector-based send operations.** This handler allows the caller to inject extra functionality to the send operation* without polluting src/socket.cc with unrelated code* (such as collecting RTCP session statistics info or encrypting a packet)** "arg" is an optional parameter that can be passed to the handler when it's called */rtp_error_t install_handler(std::shared_ptr<std::atomic<std::uint32_t>> local_ssrc, void* arg, packet_handler_vec handler);rtp_error_t remove_handler(std::shared_ptr<std::atomic<std::uint32_t>> local_ssrc);static bool is_multicast(sockaddr_in& local_address);static bool is_multicast(sockaddr_in6& local_address);private:/* helper function for sending UPD packets, see documentation for sendto() above */rtp_error_t __sendto(sockaddr_in& addr, sockaddr_in6& addr6, bool ipv6, uint8_t* buf, size_t buf_len, int send_flags, int* bytes_sent);rtp_error_t __recv(uint8_t* buf, size_t buf_len, int recv_flags, int* bytes_read);rtp_error_t __recvfrom_ip6(uint8_t* buf, size_t buf_len, int recv_flags, sockaddr_in6* sender, int* bytes_read);rtp_error_t __recvfrom(uint8_t* buf, size_t buf_len, int recv_flags, sockaddr_in* sender, int* bytes_read);/* __sendtov() does the same as __sendto but it combines multiple buffers into one frame and sends them */rtp_error_t __sendtov(sockaddr_in& addr, sockaddr_in6& addr6, bool ipv6, buf_vec& buffers, int send_flags, int* bytes_sent);rtp_error_t __sendtov(sockaddr_in& addr, sockaddr_in6& addr6, bool ipv6, pkt_vec& buffers, int send_flags, int* bytes_sent);socket_t socket_;//sockaddr_in remote_address_;sockaddr_in local_address_;//sockaddr_in6 remote_ip6_address_;sockaddr_in6 local_ip6_address_;bool ipv6_;int rce_flags_;std::mutex handlers_mutex_;std::mutex conf_mutex_;/* __sendto() calls these handlers in order before sending the packet */std::multimap<std::shared_ptr<std::atomic<std::uint32_t>>, socket_packet_handler> buf_handlers_;/* __sendtov() calls these handlers in order before sending the packet */std::multimap<std::shared_ptr<std::atomic<std::uint32_t>>, socket_packet_handler> vec_handlers_;#ifndef NDEBUGuint64_t sent_packets_ = 0;uint64_t received_packets_ = 0;
#endif // !NDEBUG#ifdef _WIN32WSABUF buffers_[MAX_BUFFER_COUNT];
#elsestruct mmsghdr header_;struct iovec chunks_[MAX_BUFFER_COUNT];
#endif
};
具体实现
#include "sock.h"static inline void hex_dump(uint8_t* buf, size_t len)
{if (!buf)return;for (size_t i = 0; i < len; i += 10) {fprintf(stderr, "\t");for (size_t k = i; k < i + 10; ++k) {fprintf(stderr, "0x%02x ", buf[k]);}fprintf(stderr, "\n");}
}static inline void set_bytes(int* ptr, int nbytes)
{if (ptr)* ptr = nbytes;
}static inline void* memdup(const void* src, size_t len)
{uint8_t* dst = new uint8_t[len];std::memcpy(dst, src, len);return dst;
}#ifdef _WIN32static inline void win_get_last_error(void)
{wchar_t* s = NULL;FormatMessageW(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,NULL, WSAGetLastError(),MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),(LPWSTR)&s, 0, NULL);fprintf(stderr, "%S %d\n", s, WSAGetLastError());LocalFree(s);
}
#endif#include <thread>#ifdef _WIN32
#include <Windows.h>
#include <winsock2.h>
#include <Ws2tcpip.h>
#include <ws2def.h>
#include <ws2ipdef.h>
#else
#include <unistd.h>
#include <poll.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <netdb.h>
#endif#if defined(__MINGW32__) || defined(__MINGW64__)
#include "mingw_inet.hh"
using namespace mingw;
#endif#include <cstring>
#include <cassert>#define WSABUF_SIZE 256socket::socket(int rce_flags) :socket_(0),local_address_(),local_ip6_address_(),ipv6_(false),rce_flags_(rce_flags),
#ifdef _WIN32buffers_()
#elseheader_(),chunks_()
#endif
{}socket::~socket()
{// printf("Socket total sent packets is %lu and received packets is %lu", sent_packets_, received_packets_);#ifndef _WIN32close(socket_);
#elseclosesocket(socket_);
#endif
}rtp_error_t socket::init(short family, int type, int protocol)
{if (family == AF_INET6) {ipv6_ = true;}else {ipv6_ = false;}#ifdef _WIN32if ((socket_ = ::socket(family, type, protocol)) == INVALID_SOCKET) {win_get_last_error();
#elseif ((socket_ = ::socket(family, type, protocol)) < 0) {printf("Failed to create socket: %s", strerror(errno));
#endifreturn -1;}#ifdef _WIN32BOOL bNewBehavior = FALSE;DWORD dwBytesReturned = 0;WSAIoctl(socket_, _WSAIOW(IOC_VENDOR, 12), &bNewBehavior, sizeof(bNewBehavior), NULL, 0, &dwBytesReturned, NULL, NULL);
#endifreturn 0;}rtp_error_t socket::setsockopt(int level, int optname, const void* optval, socklen_t optlen)
{std::lock_guard<std::mutex> lg(conf_mutex_);if (::setsockopt(socket_, level, optname, (const char*)optval, optlen) < 0) {//strerror(errno), depricatedprintf("Failed to set socket options");return -1;}return 0;
}rtp_error_t socket::bind(short family, unsigned host, short port)
{assert(family == AF_INET);local_address_ = create_sockaddr(family, host, port);return bind(local_address_);
}bool socket::is_multicast(sockaddr_in & local_address)
{// Multicast addresses ranges from 224.0.0.0 to 239.255.255.255 (0xE0000000 to 0xEFFFFFFF)auto addr = local_address.sin_addr.s_addr;return (ntohl(addr) & 0xF0000000) == 0xE0000000;
}rtp_error_t socket::bind(sockaddr_in & local_address)
{local_address_ = local_address;printf("Binding to address %s", sockaddr_to_string(local_address_).c_str());if (!socket::is_multicast(local_address_)) {// Regular addressif (::bind(socket_, (struct sockaddr*)&local_address_, sizeof(local_address_)) < 0) {
#ifdef _WIN32win_get_last_error();
#elsefprintf(stderr, "%s\n", strerror(errno));
#endifprintf("Binding to port %u failed!", ntohs(local_address_.sin_port));return -1;}}else {// Multicast address// Reuse address to enabled receiving the same stream multiple timesconst int enable = 1;if (::setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, (const char*)&enable, sizeof(int)) < 0) {
#ifdef _WIN32win_get_last_error();
#elsefprintf(stderr, "%s\n", strerror(errno));
#endifprintf("Reuse address failed!");}// Bind with empty addressauto bind_addr_in = local_address_;bind_addr_in.sin_addr.s_addr = htonl(INADDR_ANY);if (::bind(socket_, (struct sockaddr*)&bind_addr_in, sizeof(bind_addr_in)) < 0) {
#ifdef _WIN32win_get_last_error();
#elsefprintf(stderr, "%s\n", strerror(errno));
#endifprintf("Binding to port %u failed!", ntohs(bind_addr_in.sin_port));return -1;}// Join multicast membershipstruct ip_mreq mreq {};mreq.imr_multiaddr.s_addr = local_address_.sin_addr.s_addr;mreq.imr_interface.s_addr = htonl(INADDR_ANY);if (::setsockopt(socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*)&mreq, sizeof(mreq)) < 0) {
#ifdef _WIN32win_get_last_error();
#elsefprintf(stderr, "%s\n", strerror(errno));
#endifprintf("Multicast join failed!");return -1;}}return 0;
}bool socket::is_multicast(sockaddr_in6 & local_address)
{// Multicast IP addresses have their first byte equals to 0xFFauto addr = local_address.sin6_addr.s6_addr;return addr[0] == 0xFF;
}rtp_error_t socket::bind_ip6(sockaddr_in6 & local_address)
{local_ip6_address_ = local_address;printf("Binding to address %s", sockaddr_ip6_to_string(local_ip6_address_).c_str());if (!socket::is_multicast(local_ip6_address_)) {if (::bind(socket_, (struct sockaddr*)&local_ip6_address_, sizeof(local_ip6_address_)) < 0) {
#ifdef _WIN32win_get_last_error();
#elsefprintf(stderr, "%s\n", strerror(errno));
#endifprintf("Binding to port %u failed!", ntohs(local_ip6_address_.sin6_port));return -1;}}else {// Multicast address// Reuse address to enabled receiving the same stream multiple timesconst int enable = 1;if (::setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, (const char*)&enable, sizeof(int)) < 0) {
#ifdef _WIN32win_get_last_error();
#elsefprintf(stderr, "%s\n", strerror(errno));
#endifprintf("Reuse address failed!");}// Bind with empty addressauto bind_addr_in = local_ip6_address_;bind_addr_in.sin6_addr = in6addr_any;if (::bind(socket_, (struct sockaddr*)&bind_addr_in, sizeof(bind_addr_in)) < 0) {
#ifdef _WIN32win_get_last_error();
#elsefprintf(stderr, "%s\n", strerror(errno));
#endifprintf("Binding to port %u failed!", ntohs(bind_addr_in.sin6_port));return -1;}// Join multicast membershipstruct ipv6_mreq mreq {};memcpy(&mreq.ipv6mr_multiaddr, &local_ip6_address_.sin6_addr, sizeof(mreq.ipv6mr_multiaddr));if (::setsockopt(socket_, IPPROTO_IPV6, IPV6_JOIN_GROUP, (char*)&mreq, sizeof(mreq)) < 0) {
#ifdef _WIN32win_get_last_error();
#elsefprintf(stderr, "%s\n", strerror(errno));
#endifprintf("Multicast join failed!");return -1;}}return 0;
}int socket::check_family(std::string addr)
{// Use getaddrinfo() to determine whether we are using ipv4 or ipv6 addressesstruct addrinfo hint, * res = NULL;memset(&hint, '\0', sizeof(hint));hint.ai_family = PF_UNSPEC;hint.ai_flags = AI_NUMERICHOST;if (getaddrinfo(addr.c_str(), NULL, &hint, &res) != 0) {printf("Invalid IP address");return -1;}if (res->ai_family == AF_INET6) {printf("Using an IPv6 address");return 2;}else {printf("Using an IPv4 address");return 1;}return -1;
}sockaddr_in socket::create_sockaddr(short family, unsigned host, short port)
{assert(family == AF_INET);sockaddr_in addr;memset(&addr, 0, sizeof(addr));addr.sin_family = family;addr.sin_port = htons(port);addr.sin_addr.s_addr = htonl(host);return addr;
}sockaddr_in socket::create_sockaddr(short family, std::string host, short port)
{assert(family == AF_INET);sockaddr_in addr;memset(&addr, 0, sizeof(addr));addr.sin_family = family;inet_pton(AF_INET, host.c_str(), &addr.sin_addr);addr.sin_port = htons((uint16_t)port);return addr;
}// This function seems to not be currently used anywhere
sockaddr_in6 socket::create_ip6_sockaddr(unsigned host, short port)
{sockaddr_in6 addr;memset(&addr, 0, sizeof(addr));addr.sin6_family = AF_INET6;std::string host_str = std::to_string(host);inet_pton(AF_INET6, host_str.c_str(), &addr.sin6_addr);addr.sin6_port = htons((uint16_t)port);return addr;
}sockaddr_in6 socket::create_ip6_sockaddr(std::string host, short port)
{sockaddr_in6 addr;memset(&addr, 0, sizeof(addr));addr.sin6_family = AF_INET6;inet_pton(AF_INET6, host.c_str(), &addr.sin6_addr);addr.sin6_port = htons((uint16_t)port);return addr;
}sockaddr_in6 socket::create_ip6_sockaddr_any(short src_port) {sockaddr_in6 addr;memset(&addr, 0, sizeof(addr));addr.sin6_family = AF_INET6;addr.sin6_addr = in6addr_any;addr.sin6_port = htons(src_port);return addr;
}std::string socket::get_socket_path_string() const
{/*if (ipv6_) {return sockaddr_ip6_to_string(local_ip6_address_) + " -> " + sockaddr_ip6_to_string(remote_ip6_address_);}return sockaddr_to_string(local_address_) + " -> " + sockaddr_to_string(remote_address_);*/return "Not implemented";
}std::string socket::sockaddr_to_string(const sockaddr_in & addr)
{int addr_len = INET_ADDRSTRLEN;char* c_string = new char[INET_ADDRSTRLEN];memset(c_string, 0, INET_ADDRSTRLEN);char* addr_string = new char[addr_len];memset(addr_string, 0, addr_len);#ifdef WIN32PVOID pvoid_sin_addr = const_cast<PVOID>((void*)(&addr.sin_addr));inet_ntop(addr.sin_family, pvoid_sin_addr, addr_string, addr_len);
#elseinet_ntop(addr.sin_family, &addr.sin_addr, addr_string, addr_len);
#endifstd::string string(addr_string);string.append(":" + std::to_string(ntohs(addr.sin_port)));delete[] addr_string;return string;
}std::string socket::sockaddr_ip6_to_string(const sockaddr_in6 & addr6)
{char* c_string = new char[INET6_ADDRSTRLEN];memset(c_string, 0, INET6_ADDRSTRLEN);inet_ntop(AF_INET6, &addr6.sin6_addr, c_string, INET6_ADDRSTRLEN);std::string string(c_string);string.append(":" + std::to_string(ntohs(addr6.sin6_port)));delete[] c_string;return string;
}socket_t& socket::get_raw_socket()
{return socket_;
}rtp_error_t socket::install_handler(std::shared_ptr<std::atomic<std::uint32_t>> local_ssrc, void* arg, packet_handler_vec handler)
{handlers_mutex_.lock();if (!handler)return -1;socket_packet_handler hndlr;hndlr.arg = arg;hndlr.handler = handler;vec_handlers_.insert({ local_ssrc, hndlr });handlers_mutex_.unlock();return 0;
}rtp_error_t socket::remove_handler(std::shared_ptr<std::atomic<std::uint32_t>> local_ssrc){handlers_mutex_.lock();vec_handlers_.erase(local_ssrc);handlers_mutex_.unlock();return 0;
}rtp_error_t socket::__sendto(sockaddr_in & addr, sockaddr_in6 & addr6, bool ipv6, uint8_t * buf, size_t buf_len, int send_flags, int* bytes_sent)
{int nsend = 0;#ifndef _WIN32if (ipv6) {nsend = ::sendto(socket_, buf, buf_len, send_flags, (const struct sockaddr*)&addr6, sizeof(addr6));}else {nsend = ::sendto(socket_, buf, buf_len, send_flags, (const struct sockaddr*)&addr, sizeof(addr));}if (nsend == -1) {printf("Failed to send data: %s", strerror(errno));if (bytes_sent) {*bytes_sent = -1;}return RTP_SEND_ERROR;}
#elseDWORD sent_bytes = 0;WSABUF data_buf;data_buf.buf = (char*)buf;data_buf.len = (ULONG)buf_len;int result = -1;if (ipv6) {result = WSASendTo(socket_, &data_buf, 1, &sent_bytes, send_flags, (const struct sockaddr*)&addr6, sizeof(addr6), nullptr, nullptr);}else {result = WSASendTo(socket_, &data_buf, 1, &sent_bytes, send_flags, (const struct sockaddr*)&addr, sizeof(addr), nullptr, nullptr);}if (result == -1) {win_get_last_error();if (ipv6_) {printf("Failed to send to %s", sockaddr_ip6_to_string(addr6).c_str());}else {printf("Failed to send to %s", sockaddr_to_string(addr).c_str());}if (bytes_sent)*bytes_sent = -1;return -2;}nsend = sent_bytes;
#endifif (bytes_sent) {*bytes_sent = nsend;}#ifndef NDEBUG++sent_packets_;
#endif // !NDEBUGreturn 0;
}rtp_error_t socket::sendto(sockaddr_in & addr, sockaddr_in6 & addr6, uint8_t * buf, size_t buf_len, int send_flags, int* bytes_sent)
{return __sendto(addr, addr6, ipv6_, buf, buf_len, send_flags, bytes_sent);
}rtp_error_t socket::sendto(sockaddr_in & addr, sockaddr_in6 & addr6, uint8_t * buf, size_t buf_len, int send_flags)
{return __sendto(addr, addr6, ipv6_, buf, buf_len, send_flags, nullptr);
}rtp_error_t socket::__sendtov(sockaddr_in & addr,sockaddr_in6 & addr6,bool ipv6,buf_vec & buffers,int send_flags, int* bytes_sent
)
{
#ifndef _WIN32int sent_bytes = 0;for (size_t i = 0; i < buffers.size(); ++i) {chunks_[i].iov_len = buffers.at(i).first;chunks_[i].iov_base = buffers.at(i).second;sent_bytes += buffers.at(i).first;}if (ipv6) {header_.msg_hdr.msg_name = (void*)&addr6;header_.msg_hdr.msg_namelen = sizeof(addr6);}else {header_.msg_hdr.msg_name = (void*)&addr;header_.msg_hdr.msg_namelen = sizeof(addr);}header_.msg_hdr.msg_iov = chunks_;header_.msg_hdr.msg_iovlen = buffers.size();header_.msg_hdr.msg_control = 0;header_.msg_hdr.msg_controllen = 0;if (sendmmsg(socket_, &header_, 1, send_flags) < 0) {printf("Failed to send RTP frame: %s!", strerror(errno));set_bytes(bytes_sent, -1);return RTP_SEND_ERROR;}
#elseDWORD sent_bytes = 0;// DWORD corresponds to uint16 on most platformsif (buffers.size() > UINT16_MAX){printf("Trying to send too large buffer");return -1;}/* create WSABUFs from input buffers and send them at once */for (size_t i = 0; i < buffers.size(); ++i) {buffers_[i].len = (ULONG)buffers.at(i).first;buffers_[i].buf = (char*)buffers.at(i).second;}int success = 0;if (ipv6) {success = WSASendTo(socket_, buffers_, (DWORD)buffers.size(), &sent_bytes, send_flags,(SOCKADDR*)&addr6, sizeof(addr6), nullptr, nullptr);}else {success = WSASendTo(socket_, buffers_, (DWORD)buffers.size(), &sent_bytes, send_flags,(SOCKADDR*)&addr, sizeof(addr), nullptr, nullptr);}if (success != 0) {win_get_last_error();if (ipv6_) {printf("Failed to send to %s", sockaddr_ip6_to_string(addr6).c_str());}else {printf("Failed to send to %s", sockaddr_to_string(addr).c_str());}set_bytes(bytes_sent, -1);return -1;}#endif#ifndef NDEBUG++sent_packets_;
#endif // !NDEBUGset_bytes(bytes_sent, sent_bytes);return 0;
}rtp_error_t socket::sendto(uint32_t ssrc, sockaddr_in & addr, sockaddr_in6 & addr6, buf_vec & buffers, int send_flags)
{rtp_error_t ret = 0;std::lock_guard<std::mutex> lg(handlers_mutex_);for (auto& handler : vec_handlers_) {if (handler.first.get()->load() != ssrc) {continue;}if ((ret = (*handler.second.handler)(handler.second.arg, buffers)) != 0) {printf("Malformed packet");return ret;}}// buf_vecreturn __sendtov(addr, addr6, ipv6_, buffers, send_flags, nullptr);
}rtp_error_t socket::sendto(uint32_t ssrc,sockaddr_in & addr,sockaddr_in6 & addr6,buf_vec & buffers,int send_flags, int* bytes_sent
)
{rtp_error_t ret = 0;std::lock_guard<std::mutex> lg(handlers_mutex_);for (auto& handler : vec_handlers_) {if (handler.first.get()->load() != ssrc) {continue;}if ((ret = (*handler.second.handler)(handler.second.arg, buffers)) != 0) {printf("Malformed packet");return ret;}}// buf_vecreturn __sendtov(addr, addr6, ipv6_, buffers, send_flags, bytes_sent);
}rtp_error_t socket::__sendtov(sockaddr_in & addr,sockaddr_in6 & addr6,bool ipv6,pkt_vec & buffers,int send_flags, int* bytes_sent
)
{rtp_error_t return_value = 0;int sent_bytes = 0;#ifndef _WIN32struct mmsghdr* headers = new struct mmsghdr[buffers.size()];struct mmsghdr* hptr = headers;for (size_t i = 0; i < buffers.size(); ++i) {headers[i].msg_hdr.msg_iov = new struct iovec[buffers[i].size()];headers[i].msg_hdr.msg_iovlen = buffers[i].size();if (ipv6) {headers[i].msg_hdr.msg_name = (void*)&addr6;headers[i].msg_hdr.msg_namelen = sizeof(addr6);}else {headers[i].msg_hdr.msg_name = (void*)&addr;headers[i].msg_hdr.msg_namelen = sizeof(addr);}headers[i].msg_hdr.msg_control = 0;headers[i].msg_hdr.msg_controllen = 0;for (size_t k = 0; k < buffers[i].size(); ++k) {headers[i].msg_hdr.msg_iov[k].iov_len = buffers[i][k].first;headers[i].msg_hdr.msg_iov[k].iov_base = buffers[i][k].second;sent_bytes += buffers[i][k].first;}}ssize_t npkts = (rce_flags_ & RCE_SYSTEM_CALL_CLUSTERING) ? 1024 : 1;ssize_t bptr = buffers.size();while (bptr > npkts) {if (sendmmsg(socket_, hptr, npkts, send_flags) < 0) {log_platform_error("sendmmsg(2) failed");return_value = RTP_SEND_ERROR;break;}bptr -= npkts;hptr += npkts;}if (return_value == RTP_OK){if (sendmmsg(socket_, hptr, bptr, send_flags) < 0) {log_platform_error("sendmmsg(2) failed");return_value = RTP_SEND_ERROR;}}for (size_t i = 0; i < buffers.size(); ++i){if (headers[i].msg_hdr.msg_iov){delete[] headers[i].msg_hdr.msg_iov;}}delete[] headers;#elseINT ret = 0;WSABUF wsa_bufs[WSABUF_SIZE];for (auto& buffer : buffers) {if (buffer.size() > WSABUF_SIZE) {printf("Input vector to __sendtov() has more than %u elements!", WSABUF_SIZE);return_value = -1;break;}/* create WSABUFs from input buffer and send them at once */for (size_t i = 0; i < buffer.size(); ++i) {wsa_bufs[i].len = (ULONG)buffer.at(i).first;wsa_bufs[i].buf = (char*)buffer.at(i).second;}send_:DWORD sent_bytes_dw = 0;if (ipv6) {ret = WSASendTo(socket_,wsa_bufs,(DWORD)buffer.size(),&sent_bytes_dw,send_flags,(SOCKADDR*)&addr6,sizeof(addr6),nullptr,nullptr);}else {ret = WSASendTo(socket_,wsa_bufs,(DWORD)buffer.size(),&sent_bytes_dw,send_flags,(SOCKADDR*)&addr,sizeof(addr),nullptr,nullptr);}sent_bytes = sent_bytes_dw;if (ret == SOCKET_ERROR) {int error = WSAGetLastError();if (error == WSAEWOULDBLOCK) {printf("WSASendTo would block, trying again after 3 ms");std::this_thread::sleep_for(std::chrono::milliseconds(3));goto send_;}else{printf("WSASendTo failed with error %li", error);if (ipv6_) {printf("Failed to send to %s", sockaddr_ip6_to_string(addr6).c_str());}else {printf("Failed to send to %s", sockaddr_to_string(addr).c_str());}}sent_bytes = -1;return_value = -1;break;}}
#endif#ifndef NDEBUGsent_packets_ += buffers.size();
#endif // !NDEBUGset_bytes(bytes_sent, sent_bytes);return return_value;
}rtp_error_t socket::sendto(uint32_t ssrc, sockaddr_in & addr, sockaddr_in6 & addr6, pkt_vec & buffers, int send_flags)
{rtp_error_t ret = 0;for (auto& buffer : buffers) {std::lock_guard<std::mutex> lg(handlers_mutex_);for (auto& handler : vec_handlers_) {if (handler.first.get()->load() != ssrc) {continue;}if ((ret = (*handler.second.handler)(handler.second.arg, buffer)) != 0) {printf("Malformed packet");return ret;}}}return __sendtov(addr, addr6, ipv6_, buffers, send_flags, nullptr);
}rtp_error_t socket::sendto(uint32_t ssrc, sockaddr_in & addr, sockaddr_in6 & addr6, pkt_vec & buffers, int send_flags, int* bytes_sent)
{rtp_error_t ret = 0;for (auto& buffer : buffers) {std::lock_guard<std::mutex> lg(handlers_mutex_);for (auto& handler : vec_handlers_) {if (handler.first.get()->load() != ssrc) {continue;}if ((ret = (*handler.second.handler)(handler.second.arg, buffer)) != 0) {printf("Malformed packet");return ret;}}}return __sendtov(addr, addr6, ipv6_, buffers, send_flags, bytes_sent);
}rtp_error_t socket::__recv(uint8_t * buf, size_t buf_len, int recv_flags, int* bytes_read)
{if (!buf || !buf_len) {set_bytes(bytes_read, -1);return -1;}#ifndef _WIN32int32_t ret = ::recv(socket_, buf, buf_len, recv_flags);if (ret == -1) {if (errno == EAGAIN || errno == EINTR) {set_bytes(bytes_read, 0);return RTP_INTERRUPTED;}printf("recv(2) failed: %s", strerror(errno));set_bytes(bytes_read, -1);return RTP_GENERIC_ERROR;}set_bytes(bytes_read, ret);
#else(void)recv_flags;WSABUF DataBuf;DataBuf.len = (u_long)buf_len;DataBuf.buf = (char*)buf;DWORD bytes_received = 0;DWORD d_recv_flags = 0;int rc = ::WSARecv(socket_, &DataBuf, 1, &bytes_received, &d_recv_flags, NULL, NULL);if (rc == SOCKET_ERROR) {int err = WSAGetLastError();if (err == WSA_IO_PENDING || err == WSAEWOULDBLOCK) {set_bytes(bytes_read, 0);return 0;}//log_platform_error("WSARecv() failed");set_bytes(bytes_read, -1);return -1;}set_bytes(bytes_read, bytes_received);
#endif#ifndef NDEBUG++received_packets_;
#endif // !NDEBUGreturn 0;
}rtp_error_t socket::recv(uint8_t * buf, size_t buf_len, int recv_flags)
{return socket::__recv(buf, buf_len, recv_flags, nullptr);
}rtp_error_t socket::recv(uint8_t * buf, size_t buf_len, int recv_flags, int* bytes_read)
{return socket::__recv(buf, buf_len, recv_flags, bytes_read);
}rtp_error_t socket::__recvfrom(uint8_t * buf, size_t buf_len, int recv_flags, sockaddr_in * sender, int* bytes_read)
{socklen_t* len_ptr = nullptr;socklen_t len = sizeof(sockaddr_in);if (sender)len_ptr = &len;#ifndef _WIN32int32_t ret = ::recvfrom(socket_, buf, buf_len, recv_flags, (struct sockaddr*)sender, len_ptr);if (ret == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) {set_bytes(bytes_read, 0);return RTP_INTERRUPTED;}printf("recvfrom failed: %s", strerror(errno));set_bytes(bytes_read, -1);return RTP_GENERIC_ERROR;}set_bytes(bytes_read, ret);
#else(void)recv_flags;WSABUF DataBuf;DataBuf.len = (u_long)buf_len;DataBuf.buf = (char*)buf;DWORD bytes_received = 0;DWORD d_recv_flags = 0;int rc = ::WSARecvFrom(socket_, &DataBuf, 1, &bytes_received, &d_recv_flags, (SOCKADDR*)sender, (int*)len_ptr, NULL, NULL);if (WSAGetLastError() == WSAEWOULDBLOCK)return 0;int err = 0;if ((rc == SOCKET_ERROR) && (WSA_IO_PENDING != (err = WSAGetLastError()))) {/* win_get_last_error(); */set_bytes(bytes_read, -1);return -1;}set_bytes(bytes_read, bytes_received);
#endif#ifndef NDEBUG++received_packets_;
#endif // !NDEBUGreturn 0;
}rtp_error_t socket::__recvfrom_ip6(uint8_t * buf, size_t buf_len, int recv_flags, sockaddr_in6 * sender, int* bytes_read)
{socklen_t* len_ptr = nullptr;socklen_t len = sizeof(sockaddr_in6);if (sender)len_ptr = &len;#ifndef _WIN32int32_t ret = ::recvfrom(socket_, buf, buf_len, recv_flags, (struct sockaddr*)sender, len_ptr);if (ret == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) {set_bytes(bytes_read, 0);return RTP_INTERRUPTED;}printf("recvfrom failed: %s", strerror(errno));set_bytes(bytes_read, -1);return RTP_GENERIC_ERROR;}set_bytes(bytes_read, ret);
#else(void)recv_flags;WSABUF DataBuf;DataBuf.len = (u_long)buf_len;DataBuf.buf = (char*)buf;DWORD bytes_received = 0;DWORD d_recv_flags = 0;int rc = ::WSARecvFrom(socket_, &DataBuf, 1, &bytes_received, &d_recv_flags, (SOCKADDR*)sender, (int*)len_ptr, NULL, NULL);if (WSAGetLastError() == WSAEWOULDBLOCK)return 0;int err = 0;if ((rc == SOCKET_ERROR) && (WSA_IO_PENDING != (err = WSAGetLastError()))) {/* win_get_last_error(); */set_bytes(bytes_read, -1);return -1;}set_bytes(bytes_read, bytes_received);
#endif#ifndef NDEBUG++received_packets_;
#endif // !NDEBUGreturn 0;
}rtp_error_t socket::recvfrom(uint8_t * buf, size_t buf_len, int recv_flags, sockaddr_in * sender,sockaddr_in6 * sender6, int* bytes_read)
{if (ipv6_) {return __recvfrom_ip6(buf, buf_len, recv_flags, sender6, bytes_read);}return __recvfrom(buf, buf_len, recv_flags, sender, bytes_read);
}rtp_error_t socket::recvfrom(uint8_t * buf, size_t buf_len, int recv_flags, int* bytes_read)
{if (ipv6_) {return __recvfrom_ip6(buf, buf_len, recv_flags, nullptr, bytes_read);}return __recvfrom(buf, buf_len, recv_flags, nullptr, bytes_read);
}rtp_error_t socket::recvfrom(uint8_t * buf, size_t buf_len, int recv_flags, sockaddr_in * sender)
{return __recvfrom(buf, buf_len, recv_flags, sender, nullptr);
}rtp_error_t socket::recvfrom(uint8_t * buf, size_t buf_len, int recv_flags)
{return __recvfrom(buf, buf_len, recv_flags, nullptr, nullptr);
}
如果只需要在windows下使用,那就简单一些,抽取一些代码,包装成windows下可以使用的类
socket windows 封装
只需要一个头文件,应急的使用可以使用,代码如下,适用于windows
class c_sock
{
private:///socket端口int m_port;///socket句柄SOCKET m_socket;///socket类型int m_SockType;
public:///本定绑定的地址信息。struct sockaddr_in m_SockAddr; public:///构造函数c_sock(int port = 0) {m_socket = INVALID_SOCKET; m_port = port;}virtual ~c_sock() { Close();}public:///初始化Windows Socket Lib的函数。static BOOL Initialize(){//WSADATA wsaData;//return !WSAStartup(0x101, &wsaData);unsigned short wVersionReq = MAKEWORD(2, 2);WSADATA wsaData;return !WSAStartup(wVersionReq, &wsaData);}///清理Windows socket lib的函数。static void Free(){ WSACleanup();}public:///将当前的socket设置为阻塞或非模式BOOL SetNonBlock(BOOL isnonblock){if (m_socket == INVALID_SOCKET)return FALSE;if (ioctlsocket(m_socket,FIONBIO,(unsigned long *)&isnonblock) == -1)return FALSE;elsereturn TRUE;}///设置当前的socket可以重用地址功能。BOOL SetReuseAddr(BOOL isreuse){if (m_socket == INVALID_SOCKET)return FALSE;if ( setsockopt(m_socket,SOL_SOCKET,SO_REUSEADDR,(char *)&isreuse,sizeof(int)) == -1 )return FALSE;elsereturn TRUE;}///打开该socket上的keepalive选项,是的tcp协议栈检测keepalive信息。BOOL SetKeepAlive(BOOL iskeep){if (m_socket == INVALID_SOCKET)return FALSE;if ( setsockopt(m_socket,SOL_SOCKET,SO_KEEPALIVE,(char *)&iskeep,sizeof(int)) == -1 )return FALSE;elsereturn TRUE;}BOOL SetBroadCastEnable(BOOL isBroadCast= TRUE){assert(m_SockType == SOCK_DGRAM);if(setsockopt(m_socket,SOL_SOCKET,SO_BROADCAST,(const char *)&isBroadCast,sizeof(int)) == SOCKET_ERROR)return FALSE;return TRUE;}///根据指定的类型创建udp或tcp socket。virtual BOOL Create(int sockettype = SOCK_STREAM , int protocoltype = 0 ){m_SockType = sockettype;m_socket = socket(PF_INET,sockettype,protocoltype);if (m_socket == INVALID_SOCKET)return FALSE;int optVal = 1024*1024*2 ;//2M字节的缓冲int optLen = sizeof(int);setsockopt(m_socket, SOL_SOCKET,SO_RCVBUF,(char*)&optVal,optLen );return TRUE;}///tcp发送信息。virtual BOOL Send(const void *buf,int buflen){assert(m_socket != INVALID_SOCKET);return (send(m_socket, (LPSTR)buf, buflen, 0) != SOCKET_ERROR);}int SendTo(const void *buf, int buflen, SOCKADDR_IN &addr){return (::sendto(m_socket, (LPSTR)buf, buflen, 0, (sockaddr*)&addr, sizeof(addr)) != SOCKET_ERROR);}int SendTo(const void* buf, int buflen){return (::sendto(m_socket, (LPSTR)buf, buflen, 0, (sockaddr*)&m_SockAddr,sizeof(m_SockAddr)) != SOCKET_ERROR);}///udp发送信息。virtual int SendTo(const void *buf,int buflen,const char* destination,short port){SOCKADDR_IN dst;assert(m_socket != INVALID_SOCKET);memset(&dst,0,sizeof(dst));dst.sin_port = htons(port);dst.sin_family = AF_INET;dst.sin_addr.s_addr = inet_addr(destination);return (::sendto(m_socket,(LPSTR)buf,buflen,0,(sockaddr*)&dst,sizeof(dst)) != SOCKET_ERROR);}///tcp接收信息virtual int Recv(void *buf,int buflen){//assert(m_socket != INVALID_SOCKET);return recv(m_socket, (LPSTR)buf, buflen, 0);}virtual int RecvFrom(void * buf,int buflen){sockaddr_in SenderAddr;int SenderAddrSize = sizeof(SenderAddr);return recvfrom(m_socket, (char*)buf, buflen, 0, (SOCKADDR *)&SenderAddr, &SenderAddrSize);}///作为tcp服务器或着使用udp方式通讯时,将socket绑定到端口。virtual BOOL Bind(int port = 0){SOCKADDR_IN sockAddr;assert(m_socket != INVALID_SOCKET);memset(&sockAddr,0,sizeof(sockAddr));sockAddr.sin_family = AF_INET;sockAddr.sin_addr.s_addr = htonl(INADDR_ANY);if(port != 0){m_port = port;sockAddr.sin_port = htons((u_short)m_port);}
#ifdef _WIN32return (SOCKET_ERROR != ::bind(m_socket, (struct sockaddr*)&sockAddr, sizeof(sockAddr)));
#elsereturn (SOCKET_ERROR != bind(m_socket, (struct sockaddr*)&sockAddr, sizeof(sockAddr)));
#endif}///为当前WASocket对象的socket句柄赋值void Attach(SOCKET soc) { m_socket = soc; }///返回当前的socket句柄,并将socket从当前对象中释放。SOCKET Dettach(){ SOCKET tmp = m_socket;m_socket = INVALID_SOCKET;return tmp;}///得到当前对象的socket句柄。SOCKET GetSocket() { return m_socket;}///作为服务器时开始监听客户端链接。BOOL ListenToClient(int backlog = 5){assert(m_socket != INVALID_SOCKET);return (SOCKET_ERROR != listen(m_socket, backlog));}///关闭当前socket。virtual void Close(){if(m_socket != INVALID_SOCKET){closesocket(m_socket);m_socket = INVALID_SOCKET;}m_SockType = 0;}///接收客户端链接。BOOL AcceptClient(c_sock& client,const void* buf = NULL,int buflen = 0){SOCKET soc;assert(m_socket != INVALID_SOCKET);soc = accept(m_socket,NULL,NULL);if(soc == INVALID_SOCKET)return FALSE;if(buf != NULL && buflen > 0)send(soc,(const char*)buf,buflen,0);client.Attach(soc);return TRUE;}///作为客户端时,链接到指定的server。virtual int Connect(const char *dest, UINT nHostPort){SOCKADDR_IN sockAddr;assert(m_socket != INVALID_SOCKET);memset(&sockAddr,0,sizeof(sockAddr));/*struct in_addr dst;int i = InetPtonA(AF_INET, dest, &dst);if (1 == i){}*/sockAddr.sin_family = AF_INET;sockAddr.sin_addr.s_addr = inet_addr(dest);
#if 0if (sockAddr.sin_addr.s_addr == INADDR_NONE){LPHOSTENT lphost;lphost = gethostbyname(dest);if (lphost != NULL)sockAddr.sin_addr.s_addr = ((LPIN_ADDR)lphost->h_addr)->s_addr;else{WSASetLastError(WSAEINVAL);return FALSE;}}
#endifsockAddr.sin_port = htons((u_short)nHostPort);return (connect(m_socket,(struct sockaddr*)&sockAddr, sizeof(sockAddr)) != SOCKET_ERROR);}virtual int Connect(unsigned long HostAddress,UINT nHostPort){SOCKADDR_IN sockAddr;assert(m_socket != INVALID_SOCKET);memset(&sockAddr,0,sizeof(sockAddr));sockAddr.sin_family = AF_INET;sockAddr.sin_addr.s_addr = HostAddress;sockAddr.sin_port = htons((u_short)nHostPort);return (connect(m_socket,(struct sockaddr*)&sockAddr, sizeof(sockAddr)) != SOCKET_ERROR);}///这个方法是从pj naughter的SMTPSocket中移植过来的,目的是为了能够使用WASocket替换其SMTPSocket。BOOL IsReadible(BOOL& bReadible){timeval timeout = {0, 0};fd_set fds;FD_ZERO(&fds);FD_SET(m_socket, &fds);int nStatus = select(0, &fds, NULL, NULL, &timeout);if (nStatus == SOCKET_ERROR){return FALSE;}else{bReadible = !(nStatus == 0);return TRUE;}}
};
使用udp方式例子
c_sock v_sock;//c_multisock v_mulsock;//the dest ip port struct
SOCKADDR_IN v_sockaddr;
c_sock::Initialize();
v_sock.Create( SOCK_DGRAM);
memset(&v_sockaddr, 0, sizeof(v_sockaddr));
v_sockaddr.sin_port = htons(destport);
v_sockaddr.sin_family = AF_INET;
v_sockaddr.sin_addr.s_addr = inet_addr(destip);