6.python网络编程

文章目录

    • 1.生产者消费者-生成器版
    • 2.生产者消费者--异步版本
    • 3.客户端/服务端-多线程版
    • 4.IO多路复用TCPServer模型
      • 4.1Select
      • 4.2Epoll
    • 5.异步IO多路复用TCPServer模型

1.生产者消费者-生成器版

import time# 消费者
def consumer():cnt = yieldwhile True:if cnt <= 0:# 暂停、让出CPUcnt = yield cntcnt -= 1time.sleep(1)print('consumer consum 1 cnt. cnt =', cnt)# 生产者 (调度器)
def producer(cnt):gen = consumer()# 激活生成器next(gen)gen.send(cnt)while True:cnt += 1print('producer producer 5 cnt. cnt =', cnt)# 调度消费者current = int(time.time())if current % 5 == 0:cnt = gen.send(cnt)else:time.sleep(1)if __name__ == '__main__':producer(0)

2.生产者消费者–异步版本

import asyncio
import time
from queue import Queue
from threading import Threaddef start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def do_sleep(x, queue):await asyncio.sleep(x)queue.put('ok')def consumer(input_queue1, out_queue1):while True:task = input_queue1.get()if not task:time.sleep(1)continueasyncio.run_coroutine_threadsafe(do_sleep(int(task), out_queue1), new_loop)if __name__ == '__main__':print(time.ctime())new_loop = asyncio.new_event_loop()loop_thread = Thread(target=start_loop, args=(new_loop,))loop_thread.daemon = Trueloop_thread.start()input_queue = Queue()input_queue.put(5)input_queue.put(3)input_queue.put(1)out_queue = Queue()consumer_thread = Thread(target=consumer, args=(input_queue, out_queue,))consumer_thread.daemon = Trueconsumer_thread.start()while True:msg = out_queue.get()print("协程运行完...")print("当前时间:", time.ctime())

3.客户端/服务端-多线程版

客户端/服务模型

在这里插入图片描述

客户端

# -*- encoding=utf-8 -*-
# 客户端import socketclient = socket.socket()
print('client.fileno:', client.fileno())client.connect(('127.0.0.1', 8999))while True:content = str(input('>>>'))client.send(content.encode())content = client.recv(1024)print('client recv content:', content)

服务端

import socket
import threadingdef thread_process(s):while True:content = s.recv(1024)if len(content) == 0:breaks.send(content.upper())print(str(content, encoding='utf-8'))  # 接受来自客户端的消息,并打印出来s.close()server = socket.socket()  # 1. 新建socket
server.bind(('127.0.0.1', 8999))  # 2. 绑定IP和端口(其中127.0.0.1为本机回环IP)
server.listen(5)  # 3. 监听连接while True:s, addr = server.accept()  # 4. 接受连接new_thread = threading.Thread(target=thread_process, args=(s,))print('new thread process connect addr:{}'.format(addr))new_thread.start()

注意:

  • AddressFamily=AF_INET:(用于 Internet 进程间通信)

  • AddressFamily=AF_UNIX(用于同一台机器进程间通信)

  • 现象:报错[WinError 10038],原因分析:socket 先 close 再调 recv 就会报错,解决办法:if not tcpCliSock._closed:

4.IO多路复用TCPServer模型

4.1Select

服务端

import select
import socket
from queue import Queue, Empty
from time import sleepserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.setblocking(False)
server_address = ("127.0.0.1", 8999)
print('starting up on %s port %s' % server_address)
server.bind(server_address)
server.listen(5)
inputs = [server]
outputs = []
message_queues = {}while inputs:print('waiting for the next event')readable, writable, exceptional = select.select(inputs, outputs, inputs)for s in readable:if s is server:connection, client_address = s.accept()print(f"connection from {client_address}")connection.setblocking(False)inputs.append(connection)message_queues[connection] = Queue()continuedata = s.recv(1024).decode()if data == "":print(f'closing:{s.getpeername()}')if s in outputs:outputs.remove(s)inputs.remove(s)s.close()message_queues.pop(s)continueprint(f'received {data} from {s.getpeername()} ')message_queues[s].put(data)if s not in outputs:outputs.append(s)for s in writable:try:queue_item = message_queues.get(s)send_data = ''if queue_item:send_data = queue_item.get_nowait()except Empty:print(outputs.remove(s))print(f"{s.getpeername()} has closed")else:if queue_item:s.send(send_data.encode())for s in exceptional:print(f"Exception condition on {s.getpeername}")inputs.remove(s)if s in outputs:outputs.remove(s)s.close()message_queues.pop(s)sleep(1)

客户端

import socketmessages = ['This is the message ', 'It will be sent ', 'in parts ', ]
server_address = ("127.0.0.1", 8999)
socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), ]
print('connecting to %s port %s' % server_address)
for s in socks:s.connect(server_address)for index, message in enumerate(messages):for s in socks:print('%s: sending "%s"' % (s.getsockname(), message + str(index)))s.send((message + str(index)).encode('utf-8'))for s in socks:data = s.recv(1024)print('%s: received "%s"' % (s.getsockname(), data))if data != "":print('closing socket', s.getsockname())s.close()
  • 为什么要将server放入到inputs中

在select模型中,将server放入到inputs中,当执行select时就会去检查server是否可读,就说明在缓冲区里有数据,对于server来说,有连接进入。使用accept获得客户端socket文件后,首先要放入到inputs当中,等待其发送消息。

  • readable

select会将所有可读的socket返回,包括server在内,假设一个客户端socket的缓冲区里有2000字节的内容,而这一次你只是读取了1024个字节,没有关系,下一次执行select模型时,由于缓冲区里还有数据,这个客户端socket还会被放入到readable列表中。因此,在读取数据时,不必再像之前那样使用一个while循环一直读取。

  • writable

在每一次写操作执行后,都从socket从writable中删除,这样做的原因很简单,该写的数据已经写完了,如果不删除,下一次select操作时,又会把他放入到writable中,可是现在已经没有数据需要写了啊,这样做没有意义,只会浪费select操作的时间,因为它要遍历outputs中的每一个socket,判断他们是否可写以决定是否将其放入到writtable中

  • 异常

在exceptional中,是发生错误和异常的socket,有了这个数组,就在也不用操心错误和异常了,不然程序写起来非常的复杂,有了统一的管理,发生错误后的清理工作将变得非常简单

4.2Epoll

服务端

# -*- encoding=utf-8 -*-
# IO多路复用TCPServer模型import select
import socketdef serve():server = socket.socket()server.bind(('127.0.0.1', 8999))server.listen(1)epoll = select.epoll()epoll.register(server.fileno(), select.EPOLLIN)connections = {}contents = {}while True:events = epoll.poll(10)for fileno, event in events:if fileno == server.fileno():# 当fd为当前服务器的描述符时,获取新连接s, addr = server.accept()  # 获取套接字和地址print(f"new connection from addr:{addr},fileno:{s.fileno()},socket:{s}")epoll.register(s.fileno(), select.EPOLLIN)connections[s.fileno()] = selif event == select.EPOLLIN:# 当fd不为服务器描述符为客户端描述符时,读事件就绪,有新数据可读s = connections[fileno]content = s.recv(1024)if content:# 当客户端发送数据时print(f"recv content is {content}")print(f"fileno:{fileno} event:{event}")epoll.modify(fileno, select.EPOLLOUT)contents[fileno] = contentelse:# 当客户端退出连接时print(f"recv content is null")print(f"fileno;{fileno} event:{event} ")epoll.unregister(fileno)s.close()connections.pop(fileno)elif event == select.EPOLLOUT:# 当fd不为服务器描述符为客户端描述符时,写事件就绪try:content = contents[fileno]s = connections[fileno]s.send(content)epoll.modify(s.fileno(), select.EPOLLIN)print(f"modify content is {content}")print(f"fileno;{fileno} event:{event} ")except Exception as error:epoll.unregister(fileno)s.close()connections.pop(fileno)contents.pop(fileno)print(f"modify content is failed")print(f"fileno;{fileno} event:{event} ")if __name__ == '__main__':serve()

客户端

# -*- encoding=utf-8 -*-
# 客户端import socketclient = socket.socket()
print('client.fileno:', client.fileno())client.connect(('127.0.0.1', 8999))while True:content = str(input('>>>'))client.send(content.encode())content = client.recv(1024)print('client recv content:', content.decode())

5.异步IO多路复用TCPServer模型

import socket
import select
from collections import dequeclass Future:"""可等待对象 Future"""def __init__(self, loop):self.loop = loopself.done = Falseself.co = Nonedef set_done(self):self.done = Truedef set_coroutine(self, co):self.co = codef __await__(self):if not self.done:yield selfreturnclass SocketWrapper:"""套接字协程适配器"""def __init__(self, sock: socket.socket, loop):self.loop = loopself.sock = sockself.sock.setblocking(False)self.fileno = self.sock.fileno()def create_future_for_events(self, events):future: Future = Future(loop=self.loop)def handler():future.set_done()self.loop.unregister_handler(self.fileno)if future.co:self.loop.add_coroutine(future.co)self.loop.register_handler(self.fileno, events, handler)return futureasync def accept(self):while True:try:sock, addr = self.sock.accept()return SocketWrapper(sock, self.loop), addrexcept BlockingIOError:future = self.create_future_for_events(select.EPOLLIN)await futureasync def recv(self, backlog):while True:try:return self.sock.recv(backlog)except BlockingIOError:future = self.create_future_for_events(select.EPOLLIN)await futureasync def send(self, data):while True:try:return self.sock.send(data)except BlockingIOError:future = self.create_future_for_events(select.EPOLLOUT)await futureclass EventLoop:"""调度器:epoll事件驱动"""current = Nonerunnable = deque()epoll = select.epoll()handler = {}@classmethoddef instance(cls):if not EventLoop.current:EventLoop.current = EventLoop()return EventLoop.currentdef register_handler(self, fileno, events, handler):self.handler[fileno] = handlerself.epoll.register(fileno, events)def unregister_handler(self, fileno):self.epoll.unregister(fileno)self.handler.pop(fileno)def add_coroutine(self, co):self.runnable.append(co)def run_coroutine(self, co):try:future: Future = co.send(None)future.set_coroutine(co)except Exception as e:print(e)print('coroutine {} stopped'.format(co.__name__))def run_forever(self):while True:while self.runnable:self.run_coroutine(co=self.runnable.popleft())events = self.epoll.poll(1)for fileno, event in events:handler = self.handler.get(fileno)handler()class TCPServer:def __init__(self, loop: EventLoop):self.loop = loopself.listen_sock: SocketWrapper = self.create_listen_socket()self.loop.add_coroutine(self.serve_forever())def create_listen_socket(self, ip='localhost', port=8999):sock = socket.socket()sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)sock.bind((ip, port))sock.listen()return SocketWrapper(sock, self.loop)async def handler_client(self, sock: SocketWrapper):while True:data = await sock.recv(1024)if not data:print('client disconnected')breakawait sock.send(data.upper())async def serve_forever(self):while True:sock, addr = await self.listen_sock.accept()print(f'client connect addr = {addr}')self.loop.add_coroutine(self.handler_client(sock))if __name__ == '__main__':loop = EventLoop.instance()server = TCPServer(loop)loop.run_forever()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/324839.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

创新指南 | 生成式AI如何引领企业创新未来?

2023年麦肯锡全球数字战略调查了1000多名受访者&#xff0c;发现&#xff1a;建立创新文化的组织与它们应用包括生成式AI在内的最新数字技术提高产出的能力之间有着惊人的强关联。 本文探讨了顶尖创新企业采取的五项行动&#xff0c;使它们与同行之间拉开距离&#xff0c;并在使…

【Linux系统】进程

本篇博客整理了进程的多方面知识&#xff0c; 旨在从进程的概念、管理、属性、创建等方面让读者更加全面系统地理解进程和操作系统的管理设计。 目录 一、进程是什么 二、操作系统如何管理进程 1.描述进程 PCB 2.组织进程 3.再谈进程和进程管理 三、Linux下的进程管理 1…

Docker Compose:简化多容器应用部署

序言 在当今的软件开发中&#xff0c;容器化技术的使用已经很普遍了。而 Docker 作为其中最流行的容器化平台之一&#xff0c;为开发者提供了方便、快捷、一致的开发和部署环境。但是&#xff0c;当我们的应用开始变得更加复杂&#xff0c;涉及到多个容器时&#xff0c;手动管…

在 Kubernetes 上运行 Apache Spark 进行大规模数据处理的实践

在刚刚结束的 Kubernetes Community Day 上海站&#xff0c;亚马逊云科技在云原生分论坛分享的“在 Kunernets 上运行 Apache Spark 进行大规模数据处理实践”引起了现场参与者的关注。开发者告诉我们&#xff0c;为了充分利用 Kubernetes 的高可用设计、弹性&#xff0c;在越来…

AI + Web3 如何打造全新创作者经济模型?

可编程 IP 的兴起&#xff0c;借助人工智能极大提高创作效率和效能&#xff0c;让 Web3 用户体会到了自主创作和产品制作的乐趣。然而&#xff0c;你知道 AI 时代来临的背景下&#xff0c;创作者经济模型又该如何在 Web3 技术的加持下走向更成熟的运作轨道吗&#xff1f;第 43 …

再谈毕业论文设计投机取巧之IVR自动语音服务系统设计(信息与通信工程A+其实不难)

目录 举个IVR例子格局打开&#xff0c;万物皆能IVR IVR系统其实可盐可甜。还能可圈可点。 戎马一生&#xff0c;归来依然IVR。 举个IVR例子 以下是IVR系统的一个例子。 当您拨打电话进入IVR系统。 首先检验是否为工作时间。 如是&#xff0c;您将被送入ivr-lang阶段&#xff0…

QT day5 作业

服务器头文件 #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QTcpServer> //服务器类 #include <QTcpSocket> //客户端类 #include <QList> //链表类 #include <QMessageBox> //消息对话框类 #include <QDebu…

【C语言】精品练习题

目录 题目一&#xff1a; 题目二&#xff1a; 题目三&#xff1a; 题目四&#xff1a; 题目五&#xff1a; 题目六&#xff1a; 题目七&#xff1a; 题目八&#xff1a; 题目九&#xff1a; 题目十&#xff1a; 题目十一&#xff1a; 题目十二&#xff1a; 题目十…

大文件传输的好帮手Libarchive:功能强大的开源归档文件处理库

在数字化时代&#xff0c;文件的存储和传输对于企业的日常运作至关重要。但是&#xff0c;服务器中的压缩文件往往无法直接查看或预览&#xff0c;这给用户带来了不便。为了解决这一问题&#xff0c;在线解压功能的开发变得尤为重要。接下来&#xff0c;小编将介绍一个能够实现…

RabbitMQ(安装配置以及与SpringBoot整合)

文章目录 1.基本介绍2.Linux下安装配置RabbitMQ1.安装erlang环境1.将文件上传到/opt目录下2.进入/opt目录下&#xff0c;然后安装 2.安装RabbitMQ1.进入/opt目录&#xff0c;安装所需依赖2.安装MQ 3.基本配置1.启动MQ2.查看MQ状态3.安装web管理插件4.安装web管理插件超时的解决…

使用xtuner微调InternLM-Chat-7B

1. 安装xtuner #激活环境 source activate test_llm # 安装xtuner pip install xtuner#还有一些依赖项需要安装 future>0.6.0 cython lxml>3.1.0 cssselect mmengine 2. 创建一个ft-oasst1 数据集的工作路径&#xff0c;进入 mkdir ft-oasst1 cd ft-oasst1 3.XTune…

MySQL系列之索引

&#x1f339;作者主页&#xff1a;青花锁 &#x1f339;简介&#xff1a;Java领域优质创作者&#x1f3c6;、Java微服务架构公号作者&#x1f604; &#x1f339;简历模板、学习资料、面试题库、技术互助 &#x1f339;文末获取联系方式 &#x1f4dd; 往期热门专栏回顾 专栏…

【Linux】环境变量是什么?如何配置?详解

&#x1f490; &#x1f338; &#x1f337; &#x1f340; &#x1f339; &#x1f33b; &#x1f33a; &#x1f341; &#x1f343; &#x1f342; &#x1f33f; &#x1f344;&#x1f35d; &#x1f35b; &#x1f364; &#x1f4c3;个人主页 &#xff1a;阿然成长日记 …

C++11:并发新纪元 —— 深入理解异步编程的力量(1)

hello &#xff01;大家好呀&#xff01; 欢迎大家来到我的Linux高性能服务器编程系列之《C11&#xff1a;并发新纪元 —— 深入理解异步编程的力量》&#xff0c;在这篇文章中&#xff0c;你将会学习到C新特性以及异步编程的好处&#xff0c;以及其如何带来的高性能的魅力&…

【算法】动态规划之背包DP问题(2024.5.11)

前言&#xff1a; 本系列是学习了董晓老师所讲的知识点做的笔记 董晓算法的个人空间-董晓算法个人主页-哔哩哔哩视频 (bilibili.com) 动态规划系列 【算法】动态规划之线性DP问题-CSDN博客 01背包 步骤&#xff1a; 分析容量j与w[i]的关系&#xff0c;然后分析是否要放…

OGG几何内核开发-BRepAlgoAPI_Fuse与BRep_Builder.MakeCompound比较

最近在与同事讨论BRepAlgoAPI_Fuse与BRep_Builder.MakeCompound有什么区别。 一、从直觉上来说&#xff0c;BRepAlgoAPI_Fuse会对两个实体相交处理&#xff0c;相交的部分会重新的生成相关的曲面。而BRep_Builder.MakeCompound仅仅是把两个实体组合成一个新的实体&#xff0c;…

JUC下的BlockingQueue详解

BlockingQueue是Java并发包(java.util.concurrent)中提供的一个接口&#xff0c;它扩展了Queue接口&#xff0c;增加了阻塞功能。这意味着当队列满时尝试入队操作&#xff0c;或者队列空时尝试出队操作&#xff0c;线程会进入等待状态&#xff0c;直到队列状态允许操作继续。这…

https://是怎么实现的?

默认的网站建设好后都是http访问模式&#xff0c;这种模式对于纯内容类型的网站来说&#xff0c;没有什么问题&#xff0c;但如果受到中间网络劫持会让网站轻易的跳转钓鱼网站&#xff0c;为避免这种情况下发生&#xff0c;所以传统的网站改为https协议&#xff0c;这种协议自己…

信息检索(35):LEXMAE: LEXICON-BOTTLENECKED PRETRAINING FOR LARGE-SCALE RETRIEVAL

LEXMAE: LEXICON-BOTTLENECKED PRETRAINING FOR LARGE-SCALE RETRIEVAL 标题摘要1 引言2 相关工作3 LEXMAE&#xff1a;词典瓶颈屏蔽自动编码器3.1 语言建模编码器3.2 词典瓶颈模块3.3 弱化掩蔽式解码器3.4 词汇加权检索器的预训练目标和微调 4 实验4.1 主要评估4.2 效率分析与…

利用OpenShift的ImageStream部署临时版本

公司是港企&#xff0c;项目都部署在OpenShift上统一管理&#xff0c;因为运行环境为香港网络(外网)&#xff0c;配置、中间件等大陆无法直接访问联通。因此在大陆开发时&#xff0c;测试是个很大的问题。为了避免往Git上频繁提交未确定可用的版本&#xff0c;选择用利用OpenSh…