C++项目实战——基于多设计模式下的同步异步日志系统-⑩-异步缓冲区类与异步工作器类设计

文章目录

  • 专栏导读
  • 异步缓冲区设计思想
  • 异步缓冲区类设计
  • 异步工作器类设计
  • 异步日志器设计
  • 异步缓冲区类整理
  • 异步工作器类整理

专栏导读

🌸作者简介:花想云 ,在读本科生一枚,C/C++领域新星创作者,新星计划导师,阿里云专家博主,CSDN内容合伙人…致力于 C/C++、Linux 学习。

🌸专栏简介:本文收录于 C++项目——基于多设计模式下的同步与异步日志系统

🌸相关专栏推荐:C语言初阶系列C语言进阶系列C++系列数据结构与算法Linux

在这里插入图片描述
为了避免因为写日志的过程阻塞,导致业务线程在写日志的时候影响其效率(例如由于网络原因导致日志写入阻塞,进而导致业务线程阻塞),因此我们需要设计一个异步日志器

异步的思想就是不让业务线程进行日志的实际落地操作,而是将日志消息放到缓冲区(一块指定内存)当中,接下来有一个专门的异步线程,去针对缓冲区中的数据进行处理(实际落地操作)

所以,异步日志器的实现思想:

  • 设计一个线程安全的缓冲区
  • 创建一个异步工作线程,专门负责缓冲区中日志消息落地操作。

异步缓冲区设计思想

在任务池的设计中,有很多备选方案,比如队列、循环队列等,但是不管哪一种都会涉及到锁冲突的情况,因为在生产者与消费者模型中,任何两个角色之间具有互斥关系,因此每一次任务的添加与取出都有可能涉及锁的冲突

所以我们采用双缓冲区的的设计思想,优势在于:

  • 避免了空间的频繁申请与释放,且尽可能的减少了生产者与消费则之间锁冲突的概率,提高了任务处理效率

双缓冲区的设计思想是:采用两个缓冲区,一个用来进行任务写入(push pool),一个进行任务处理(pop pool)。当异步工作线程(消费者)将缓冲区中的数据全部处理完毕之后,然后交换两个缓冲区,重新对新的缓冲区中的任务进行处理,虽然同时多线程写入也会产生冲突,但是冲突并不会像每次只处理一条的时候频繁(减少了消费者与生产者之间的锁冲突),且不涉及到空间的频繁申请释放所带来的的消耗。

在这里插入图片描述

异步缓冲区类设计

类中包含的成员:

  • 一个存放字符串数据的缓冲区(使用vector进行空间管理);
  • 当前写入数据位置的指针(指向可写区域的起始位置,避免数据的写入覆盖);
  • 当前读取数据位置的指针(指向可读区域的起始位置,当读取指针与写入指针指向相同的位置表示数据读取完了);

类中提供的操作:

  • 向缓冲区中写入数据
  • 获取可读数据起始地址的接口
  • 获取可读数据长度的接口
  • 移动读写位置的接口
  • 初始化缓冲区的操作(将读写位置初始化–在一个缓冲区所有数据处理完毕之后);
  • 提供交换缓冲区的操作(交换空间地址,并不交换空间数据)。

注意,缓冲区中直接存放格式化后的日志消息字符串,而不是LogMsg对象,这样做有两个好处:

  • 减少了LogMsg对象频繁的构造的消耗;
  • 可以针对缓冲区中的日志消息,一次性进行IO操作,减少IO次数,提高效率。
#ifndef __M_BUFFER_H__
#define __M_BUFFER_H__
#include "util.hpp"
#include <vector>
#include <cassert>namespace LOG
{#define DEFAULT_BUFFER_SIZE (1 * 1024 * 1024)#define THRESHOLD_BUFFER_SIZE (8 * 1024 * 1024)#define INCREMENT_BUFFER_SIZE (1 * 1024 * 1024)class Buffer {public:Buffer() : _buffer(DEFAULT_BUFFER_SIZE), _writer_idx(0), _reader_idx(0) {}// 向缓冲区中写入数据void push(const char* data, size_t len){// 1.考虑空间不够则扩容ensureEnoughSize(len);// 2.将数据拷贝到缓冲区std::copy(data, data + len, &_buffer[_writer_idx]);// 3.将当前写入位置向后偏移moveWriter(len);}// 返回可读数据的起始地址const char* begin(){return &_buffer[_reader_idx];}// 返回可写数据的长度size_t writeAbleSize(){// 对于扩容思路并没有用, 仅针对固定大小缓冲区return (_buffer.size() - _writer_idx);}// 返回可读数据的长度size_t readAbleSize(){ return (_writer_idx - _reader_idx);}// 重置读写位置void reset(){_writer_idx = 0;_reader_idx = 0;}// 对buffer实现交换操作void swap(Buffer & buffer){_buffer.swap(buffer._buffer);std::swap(_reader_idx, buffer._reader_idx);std::swap(_writer_idx, buffer._writer_idx);}// 判断缓冲区是否为空bool empty(){return (_reader_idx == _writer_idx);}private:void ensureEnoughSize(size_t len){if(len < writeAbleSize()) return;size_t new_size = 0;if(_buffer.size() < THRESHOLD_BUFFER_SIZE){new_size = _buffer.size() * 2 + len; // 小于阈值则翻倍增长}else{new_size = _buffer.size() + INCREMENT_BUFFER_SIZE + len;}_buffer.resize(new_size);}// 对读指针进行向后偏移操作void moveReader(size_t len){assert(len <= readAbleSize());_reader_idx += len;}// 对写指针进行向后偏移操作void moveWriter(size_t len){assert(len + _writer_idx<= _buffer.size());_writer_idx += len;}private:std::vector<char> _buffer;size_t _reader_idx; // 当前可读数据的指针size_t _writer_idx; // 当前可写数据的指针};
}
#endif

异步工作器类设计

异步工作器的主要任务是,对缓冲区中的数据进行处理,若处理缓冲区中没有数据了则交换缓冲区

异步工作器类管理的成员有:

  • 双缓冲区(生产,消费);
  • 互斥锁:保证线程安全;
  • 条件变量-生产&消费:生产缓冲区中没有数据,处理完消费缓冲区数据后就休眠;
  • 回调函数:针对缓冲区中数据的处理接口——外界传入一个函数,告诉异步日志器该如何处理。

异步工作器类提供的操作有:

  • 停止异步工作器
  • 添加数据到缓冲区

私有操作:

  • 创建线程
  • 线程入口函数:在线程入口函数中交换缓冲区,对消费缓冲区数据使用回调函数进行处理,处理完后再次交换;
#ifndef __M_LOOPER_H__
#define __M_LOOPER_H__
#include "buffer.hpp"
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>namespace LOG
{using Functor = std::function<void(Buffer &)>;enum class AsyncType{ASYNC_SAFE, // 安全状态, 表示缓冲区满了则阻塞,避免资源耗尽的风险ASYNC_UNSAFE // 不考虑资源耗尽的问题};class AsyncLooper{public:using ptr = std::shared_ptr<AsyncLooper>;AsyncLooper(const Functor& cb, AsyncType looper_type = AsyncType::ASYNC_SAFE):_looper_type(looper_type),_stop(false),_call_back(cb),_thread(std::thread(&AsyncLooper::threadEntry, this)){}~AsyncLooper() { stop(); }void stop() {_stop = true; // 将退出标志设置为true_con_cond.notify_all(); // 唤醒所有工作线程_thread.join(); // 等待工作线程退出}void push(const char* data, size_t len){std::unique_lock<std::mutex> lock(_mutex);// 条件变量控制,若缓冲区剩余空间大小等于数据长度,则可以添加数据if(_looper_type == AsyncType::ASYNC_SAFE)_pro_cond.wait(lock, [&](){ return _pro_buf.writeAbleSize() >= len; });// 能够走下来说明条件满足,可以向缓冲区添加数据了_pro_buf.push(data, len);// 唤醒消费者对缓冲区中的数据进行处理_con_cond.notify_one();}private:// 线程入口函数 -- 对消费者缓冲区中的数据进行处理,处理完毕后,初始化缓冲区,交换缓冲区void threadEntry(){// 为互斥锁设置一个声明周期,当缓冲区交换完毕就解锁while(1){{// 1.判断生产缓冲区有没有数据,有则交换,无则阻塞std::unique_lock<std::mutex> lock(_mutex);if(_stop && _pro_buf.empty()) break;// 若当前是退出前被唤醒或者是有数据被唤醒,则返回真,继续向下运行,否则重新进入休眠_con_cond.wait(lock, [&](){ return _stop || !_pro_buf.empty(); });_con_buf.swap(_pro_buf);// 2.唤醒生产者if(_looper_type == AsyncType::ASYNC_SAFE)_pro_cond.notify_all();}// 3.被唤醒后,对消费者缓冲区进行数据处理_call_back(_con_buf);// 4.初始化消费者缓冲区_con_buf.reset();}}private:Functor _call_back;private:AsyncType _looper_type;std::atomic<bool> _stop;Buffer _pro_buf;Buffer _con_buf;std::mutex _mutex;std::condition_variable _pro_cond;std::condition_variable _con_cond;std::thread _thread; // 异步工作器对应的工作线程};
}
#endif

异步日志器设计

异步日志器继承自日志器类,并在同步日志器类上拓展了异步工作器。当我们需要异步输出日志的时候,需要创建异步日志器和消息处理器,调用异步日志器的log、debug、error、info、fatal等函数输出不同级别日志。

  • log函数为重写Logger类的函数,主要实现将日志日志数据加入异步缓冲区;
  • realLog函数主要由异步线程调用(是为异步工作器设置的回调函数),完成日志的实际落地操作。
class AsyncLogger : public Logger
{
public:AsyncLogger(const std::string &logger_name,LogLevel::value level,LOG::Formatter::ptr &formatter,std::vector<LogSink::ptr> &sinks,AsyncType looper_type): Logger(logger_name, level, formatter, sinks),_looper(std::make_shared<AsyncLooper>(std::bind(&AsyncLogger::realLog, this, std::placeholders::_1), looper_type)){}// 将数据写入缓冲区 void log(const char *data, size_t len){_looper->push(data, len);}// 设计一个实际落地函数void realLog(Buffer &buf){if (_sinks.empty())return;for (auto &sink : _sinks){sink->log(buf.begin(), buf.readAbleSize());}}private:AsyncLooper::ptr _looper; // 异步工作器
};

异步缓冲区类整理

#ifndef __M_BUFFER_H__
#define __M_BUFFER_H__
#include "util.hpp"
#include <vector>
#include <cassert>namespace LOG
{#define DEFAULT_BUFFER_SIZE (1 * 1024 * 1024)#define THRESHOLD_BUFFER_SIZE (8 * 1024 * 1024)#define INCREMENT_BUFFER_SIZE (1 * 1024 * 1024)class Buffer {public:Buffer() : _buffer(DEFAULT_BUFFER_SIZE), _writer_idx(0), _reader_idx(0) {}// 向缓冲区中写入数据void push(const char* data, size_t len){// 1.考虑空间不够则扩容ensureEnoughSize(len);// 2.将数据拷贝到缓冲区std::copy(data, data + len, &_buffer[_writer_idx]);// 3.将当前写入位置向后偏移moveWriter(len);}// 返回可读数据的起始地址const char* begin(){return &_buffer[_reader_idx];}// 返回可写数据的长度size_t writeAbleSize(){// 对于扩容思路并没有用, 仅针对固定大小缓冲区return (_buffer.size() - _writer_idx);}// 返回可读数据的长度size_t readAbleSize(){ return (_writer_idx - _reader_idx);}// 重置读写位置void reset(){_writer_idx = 0;_reader_idx = 0;}// 对buffer实现交换操作void swap(Buffer & buffer){_buffer.swap(buffer._buffer);std::swap(_reader_idx, buffer._reader_idx);std::swap(_writer_idx, buffer._writer_idx);}// 判断缓冲区是否为空bool empty(){return (_reader_idx == _writer_idx);}private:void ensureEnoughSize(size_t len){if(len < writeAbleSize()) return;size_t new_size = 0;if(_buffer.size() < THRESHOLD_BUFFER_SIZE){new_size = _buffer.size() * 2 + len; // 小于阈值则翻倍增长}else{new_size = _buffer.size() + INCREMENT_BUFFER_SIZE + len;}_buffer.resize(new_size);}// 对读指针进行向后偏移操作void moveReader(size_t len){assert(len <= readAbleSize());_reader_idx += len;}// 对写指针进行向后偏移操作void moveWriter(size_t len){assert(len + _writer_idx<= _buffer.size());_writer_idx += len;}private:std::vector<char> _buffer;size_t _reader_idx; // 当前可读数据的指针size_t _writer_idx; // 当前可写数据的指针};
}
#endif

异步工作器类整理

#ifndef __M_LOOPER_H__
#define __M_LOOPER_H__
#include "buffer.hpp"
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>namespace LOG
{using Functor = std::function<void(Buffer &)>;enum class AsyncType{ASYNC_SAFE, // 安全状态, 表示缓冲区满了则阻塞,避免资源耗尽的风险ASYNC_UNSAFE // 不考虑资源耗尽的问题};class AsyncLooper{public:using ptr = std::shared_ptr<AsyncLooper>;AsyncLooper(const Functor& cb, AsyncType looper_type = AsyncType::ASYNC_SAFE):_looper_type(looper_type),_stop(false),_call_back(cb),_thread(std::thread(&AsyncLooper::threadEntry, this)){}~AsyncLooper() { stop(); }void stop() {_stop = true; // 将退出标志设置为true_con_cond.notify_all(); // 唤醒所有工作线程_thread.join(); // 等待工作线程退出}void push(const char* data, size_t len){std::unique_lock<std::mutex> lock(_mutex);// 条件变量控制,若缓冲区剩余空间大小等于数据长度,则可以添加数据if(_looper_type == AsyncType::ASYNC_SAFE)_pro_cond.wait(lock, [&](){ return _pro_buf.writeAbleSize() >= len; });// 能够走下来说明条件满足,可以向缓冲区添加数据了_pro_buf.push(data, len);// 唤醒消费者对缓冲区中的数据进行处理_con_cond.notify_one();}private:// 线程入口函数 -- 对消费者缓冲区中的数据进行处理,处理完毕后,初始化缓冲区,交换缓冲区void threadEntry(){// 为互斥锁设置一个生命周期,当缓冲区交换完毕就解锁while(1){{// 1.判断生产缓冲区有没有数据,有则交换,无则阻塞std::unique_lock<std::mutex> lock(_mutex);if(_stop && _pro_buf.empty()) break;// 若当前是退出前被唤醒或者是有数据被唤醒,则返回真,继续向下运行,否则重新进入休眠_con_cond.wait(lock, [&](){ return _stop || !_pro_buf.empty(); });_con_buf.swap(_pro_buf);// 2.唤醒生产者if(_looper_type == AsyncType::ASYNC_SAFE)_pro_cond.notify_all();}// 3.被唤醒后,对消费者缓冲区进行数据处理_call_back(_con_buf);// 4.初始化消费者缓冲区_con_buf.reset();}}private:Functor _call_back;private:AsyncType _looper_type; // 选择异步工作器工作模式(安全与非安全模式)std::atomic<bool> _stop; // 工作器退出标志Buffer _pro_buf; Buffer _con_buf; std::mutex _mutex;std::condition_variable _pro_cond;std::condition_variable _con_cond;std::thread _thread; // 异步工作器对应的工作线程};
}
#endif

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/163616.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode算法栈—有效的括号

目录 有效的括号 用到的数据结构&#xff1a; 位运算、Map 和 Stack Stack 常用的函数&#xff1a; 题解&#xff1a; 代码&#xff1a; 运行结果; 给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#xff0c;判断字符…

模拟IIC通讯协议(stm32)(硬件iic后面在补)

一、IIC基础知识总结。 1、IIC通讯需要两条线就可以&#xff0c;SCL、SDA。 2、IIC的数据传输的速率&#xff0c;不同的ic是不同的&#xff0c;根据电平维持的延时函数的时间来确定IIC数据传输的速率. 3、IIC的延时函数可以使用延时函数&#xff0c;延时函数一般使用系统滴答时…

20款VS Code实用插件推荐

前言&#xff1a; VS Code是一个轻量级但功能强大的源代码编辑器&#xff0c;轻量级指的是下载下来的VS Code其实就是一个简单的编辑器&#xff0c;强大指的是支持多种语言的环境插件拓展&#xff0c;也正是因为这种支持插件式安装环境开发让VS Code成为了开发语言工具中的霸主…

Fast DDS之Subscriber

目录 SubscriberSubscriberQosSubscriberListener创建Subscriber DataReaderSampleInfo读取数据 Subscriber扮演容器的角色&#xff0c;里面可以有很多DataReaders&#xff0c;它们使用Subscriber的同一份SubscriberQos配置。Subscriber可以承载不同Topic和数据类型的DataReade…

【QT】QTreeWidget

新建项目 第一步&#xff1a;设置头标签 第二步&#xff1a;设置item 第三步&#xff1a;创建子item&#xff0c;挂载在顶层item下 完整代码 #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::W…

C++项目——云备份-①-项目介绍环境搭建

文章目录 专栏导读1.什么是云备份2.实现目标3.服务端程序负责功能4.服务端功能模块划分5.客户端程序负责功能6.客户端功能模块划分开发环境环境搭建1. gcc 升级7.3版本2.安装 jsoncpp 库3.下载bundle数据压缩库4.下载 httplib 库 专栏导读 &#x1f338;作者简介&#xff1a;花…

babel6使用ES2020最新js语法

babel6使用ES2020最新js语法 Babel 6 原本是不支持 ES2020 语法&#xff0c;因为它是在 Babel 7 中引入的。如果您想使用 ES2020 语法&#xff0c;您需要将 Babel 6 升级到 Babel 7 或更高版本(推荐),当然也可以在bebel6中安装支持某个语法的plugin,比如你想使用 ES2020 中的可…

Linux程序调试器——gdb的使用

gdb的概述 GDB 全称“GNU symbolic debugger”&#xff0c;从名称上不难看出&#xff0c;它诞生于 GNU 计划&#xff08;同时诞生的还有 GCC、Emacs 等&#xff09;&#xff0c;是 Linux 下常用的程序调试器。发展至今&#xff0c;GDB 已经迭代了诸多个版本&#xff0c;当下的…

完美解决 在将最终稿件上传到 IEEE PDF eXpress进行格式检查是出现“font not embedded“的问题 (不会出现自动压缩图像的现象)

最近中了一篇IEEE的论文&#xff0c;在校稿阶段&#xff0c;final paper是需要通过IEEE PDF eXpress网站的格式检查&#xff0c;然后出现一下问题&#xff1a; Errors: Font TimesNewRomanPS-BoldMT, TimesNewRomanPS-ItalicMT, TimesNewRomanPSMT is not embedded 用人话说就…

模式植物GO背景基因集制作

一边学习&#xff0c;一边总结&#xff0c;一边分享&#xff01; 写在前面 关于GO背景基因集文件的制作&#xff0c;我们在很早以前也发过。近两天&#xff0c;自己在分析时候&#xff0c;也是被搞了头疼。想重新制作一份GO背景基因集&#xff0c;进行富集分析。但是结果&…

JAVAEE初阶相关内容第十五弹--网络編程

写在前 简单描述一下关于路由器的三层转发和交换机的二层转发。 路由器是三层转发-->在网络层转发。【需要解析出IP协议中的源IP、目的IP来规划路径】 交换机是二层转发-->在数据链路层转发。【只需要关注下一步发展到哪个相邻的设备上&#xff0c;不需要IP地址&#…

JAVA生成ORC格式文件

一、背景 由于需要用到用java生成hdfs文件并上传到指定目录中&#xff0c;在Hive中即可查询到数据&#xff0c;基于此背景&#xff0c;开发此工具类 ORC官方网站&#xff1a;https://orc.apache.org/ 二、支持数据类型 三、工具开发 package com.xx.util;import com.alibab…

【计算机网络笔记】分组交换中的报文交付时间计算例题

系列文章目录 什么是计算机网络&#xff1f; 什么是网络协议&#xff1f; 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 系列文章目录题目解答 题目 在下图所示的采用“存储-转发”方式的分组交换网络中所有链路的数据传输速率为100 Mbps&#xff0c;分…

node+vue+mysql后台管理系统

千千博客系统&#xff0c;该项目作为一套多功能的后台框架模板&#xff0c;适用于绝大部分的后台管理系统开发。基于 vue.js&#xff0c;使用 vue-cli3 脚手架&#xff0c;引用 Element UI 组件库&#xff0c;数据库直连mysql方便开发快速简洁好看的组件。 功能包含如下&#…

EtherNet/IP转Modbus TCP协议网关的接口

远创智控的YC-EIPM-TCP网关产品&#xff0c;它有什么作用呢&#xff1f;一起来了解一下吧&#xff01; 远创智控YC-EIPM-TCP网关产品可以通过各种数据接口和工业领域的仪表、PLC、计量设备等产品连接&#xff0c;实时采集这些设备中的运行数据、状态数据等信息&#xff0c;并把…

【javascript】内部引入与外部引入javascript

创建a.html 内部引入&#xff1a; 外部引入&#xff1a; 创建a.js 注意&#xff1a; 我这里的a.js和a.html是放在同一个目录下&#xff0c;如果a.js放在js的目录下&#xff0c;a.html 调用a.js的时候 <script src"/js/a.js"></script>

sqlmap --os-shell选项原理解析

文章目录 sqlmap --os-shell选项原理解析原理解析总结 sqlmap --os-shell选项原理解析 以sqli第一关为例。 --os-shell 是 SQLMap 工具的一个参数&#xff0c;用于在成功注入数据库后&#xff0c;执行操作系统命令并获取其输出。 sqlmap -u "http://192.168.188.199/sq…

【C++】特殊类的设计(只在堆、栈创建对象,单例对象)

&#x1f30f;博客主页&#xff1a; 主页 &#x1f516;系列专栏&#xff1a; C ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ &#x1f60d;期待与大家一起进步&#xff01; 文章目录 一、请设计一个类&#xff0c;只能在堆上创建对象二、 请设计一个类&#xff0c;只能…

GO-unioffice实现word编辑

导包 import ("fmt""log""os""time""github.com/unidoc/unioffice/common/license""github.com/unidoc/unioffice/document" ) 创建word文件 func CreateFile(name string) {filename : name ".docx&quo…

【数据结构】栈

⭐ 作者&#xff1a;小胡_不糊涂 &#x1f331; 作者主页&#xff1a;小胡_不糊涂的个人主页 &#x1f4c0; 收录专栏&#xff1a;浅谈数据结构 &#x1f496; 持续更文&#xff0c;关注博主少走弯路&#xff0c;谢谢大家支持 &#x1f496; 栈-Stack 1. 什么是栈2. 栈的使用3.…