C# 中使用 MassTransit

在生产环境中使用 MassTransit 时,通常需要进行详细的配置,包括设置连接字符串、配置队列、配置消费者、处理重试和错误队列等。以下是一个完整的示例,展示了如何在 ASP.NET Core 应用程序中配置 MassTransit,包括请求/响应模式和事件模式,并使用依赖注入。

1. 安装必要的 NuGet 包

首先,确保安装了以下 NuGet 包:

  • MassTransit
  • MassTransit.RabbitMQ
  • MassTransit.AspNetCore
  • MassTransit.ExtensionsDependencyInjection
  • Microsoft.Extensions.Hosting

web端服务代码

<Project Sdk="Microsoft.NET.Sdk.Web"><PropertyGroup><TargetFramework>net6.0</TargetFramework><Nullable>enable</Nullable><ImplicitUsings>enable</ImplicitUsings></PropertyGroup><ItemGroup><PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.3.1" /><PackageReference Include="MassTransit.Redis" Version="8.3.4" /><PackageReference Include="Swashbuckle.AspNetCore" Version="7.2.0" /><PackageReference Include="MassTransit.RabbitMQ" Version="8.3.4" /></ItemGroup><ItemGroup><ProjectReference Include="..\AppDto\AppDto.csproj" /></ItemGroup></Project>
using Microsoft.OpenApi.Models;
using MassTransit;
using WebApp_MssTransit.Filters;var builder = WebApplication.CreateBuilder(args);// Add services to the container.
builder.Services.AddMvc(opt =>
{opt.Filters.Add<ExceptionFilter>();
});
builder.Services.AddSwaggerGen(c =>
{c.SwaggerDoc("v1", new OpenApiInfo { Title = "My API", Version = "v1" });
});builder.Services.AddMassTransit(x =>
{// 使用内存//x.UsingInMemory();// 使用RabbitMqx.UsingRabbitMq((context, config) =>{config.Host("rabbitmq://localhost:5672", host =>{host.Username("admin");host.Password("admin");});});
});var app = builder.Build();// Configure the HTTP request pipeline.app.UseHttpsRedirection();
app.UseRouting();app.UseSwagger();
app.UseSwaggerUI(c => c.SwaggerEndpoint("v1/swagger.json", "My API V1"));app.UseEndpoints(endpoints =>
{endpoints.MapControllers();
});app.Run();

控制器里面

using AppDto;
using MassTransit;
using Microsoft.AspNetCore.Mvc;
using System.Text.Json;
using System.Text.Json.Serialization;namespace WebApp_MssTransit.Controllers;[ApiController]
[Route("[controller]")]
public class PublishController : ControllerBase
{private readonly ILogger<PublishController> _logger;private readonly IBus _bus;public PublishController(ILogger<PublishController> logger, IBus bus){_logger = logger;_bus = bus;}[HttpPost(template : nameof(PublishAsync))]public async Task PublishAsync(){var msg = new OrderMessage(){Id = Guid.NewGuid(),Name = "Phone",CreationTime = DateTime.Now};await _bus.Publish(msg);_logger.LogInformation($"Publish message :{JsonSerializer.Serialize(msg)}");}[HttpPost(template: nameof(SendAsync))]public async Task SendAsync(){var msg = new OrderMessage(){Id = Guid.NewGuid(),Name = "Phone",CreationTime = DateTime.Now};await _bus.Send(msg);_logger.LogInformation($"Send message :{JsonSerializer.Serialize(msg)}");}[HttpPost(template: nameof(RequestAirAsync))]public async Task<AirResponse> RequestAirAsync(){var airRequest = new AirRequest{CreationTime = DateTime.Now,Identity = DateTime.Now.Ticks};var esp = await _bus.Request<AirRequest, AirResponse>(airRequest);_logger.LogInformation($"{DateTime.Now.ToStringTime()} Request Response :{JsonSerializer.Serialize(esp)}");var espMsg = esp.Message;_logger.LogInformation($"{espMsg.CreationTime.ToStringTime()} Request Response data");return espMsg;}
}

信息公共类型,这里没有定义接口,直接共享AppDto

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace AppDto;public class AirRequest
{public long Identity { get; set; }public DateTime CreationTime { get; set; }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace AppDto;public class AirResponse
{public long Identity { get; set; }public DateTime CreationTime { get; set; }
}
namespace AppDto;public class OrderMessage
{public Guid Id { get; init; }public string Name { get; set; }public DateTime CreationTime { get; set; }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace AppDto;public static class DateTimeExtant
{public static string ToStringTime(this DateTime time){return time.ToString("yyyy-MM-dd HH:mm:ss.fffff");}
}

接收消息处理端

<Project Sdk="Microsoft.NET.Sdk"><PropertyGroup><OutputType>Exe</OutputType><TargetFramework>net6.0</TargetFramework><ImplicitUsings>enable</ImplicitUsings><Nullable>enable</Nullable></PropertyGroup><ItemGroup><PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.3.1" /><PackageReference Include="MassTransit.RabbitMQ" Version="8.3.4" /><PackageReference Include="MassTransit.Redis" Version="8.3.4" /><PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" /></ItemGroup><ItemGroup><ProjectReference Include="..\AppDto\AppDto.csproj" /></ItemGroup></Project>

using MassTransit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MassTransit.RedisIntegration;
using ConsoleApp_Producer;
using MassTransit.Metadata;
using MassTransit.Util;
using System.Reflection;
using MassTransit.Internals;
using AppDto;
using ConsoleApp_consumer;Console.WriteLine("Hello, World!");var builder = Microsoft.Extensions.Hosting.Host.CreateDefaultBuilder(args);builder.ConfigureServices((hostContext, services) =>{services.AddMassTransit(x =>{// 通过扫描程序集注册消费者//x.AddConsumers(typeof(OrderConsumer).Assembly);// 通过类型单个注册消费者x.AddConsumer<OrderConsumer>();x.AddConsumer<AirHandle>();//x.SetKebabCaseEndpointNameFormatter();// 通过泛型单个注册消费者//x.AddConsumer<OrderEtoConsumer, OrderEtoConsumerDefinition>();// 通过指定命名空间注册消费者// x.AddConsumersFromNamespaceContaining<OrderEtoConsumer>();// 使用内存队列// x.UsingInMemory();x.UsingRabbitMq((context, config) =>{config.Host("rabbitmq://localhost:5672", hostconfig =>{hostconfig.Username("admin");hostconfig.Password("admin");});config.ConfigureEndpoints(context);});});});var host = builder.Build();
host.Run();
using AppDto;
using MassTransit;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;namespace ConsoleApp_Producer;public class OrderConsumer : IConsumer<OrderMessage>
{private readonly ILogger<OrderConsumer> _logger;public OrderConsumer(ILogger<OrderConsumer> logger){_logger = logger;}public Task Consume(ConsumeContext<OrderMessage> context){_logger.LogInformation($"{DateTime.Now.ToStringTime()}收到消息:{JsonSerializer.Serialize(context.Message)},\n\t{context.Message.CreationTime.ToStringTime()} 消息时间");return Task.CompletedTask;}
}
using AppDto;
using ConsoleApp_Producer;
using MassTransit;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace ConsoleApp_consumer;public class AirHandle : IConsumer<AirRequest>
{private readonly ILogger<AirHandle> _logger;public AirHandle(ILogger<AirHandle> logger){_logger = logger;}public async Task Consume(ConsumeContext<AirRequest> context){_logger.LogInformation($"{DateTime.Now.ToStringTime()}Receive Data :{context.Message.Identity}");_logger.LogInformation($"{context.Message.CreationTime.ToStringTime()} message data");var resp = new AirResponse{Identity = context.Message.Identity,CreationTime = DateTime.Now,};await context.RespondAsync(resp);_logger.LogInformation($"{DateTime.Now.ToStringTime()} Respond  Data {resp}");}
}

7. 运行应用程序

运行应用程序后,你可以通过发送 HTTP 请求来发布消息和发送请求。例如,使用 Postman 或 cURL:

  • 发布消息:

在这里插入图片描述### 8. 处理错误和重试

在生产环境中,处理错误和重试是很重要的。你可以在 MassTransit 配置中启用重试和死信队列:

x.UseInMemoryOutbox();
x.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(10)));
x.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(30)));
x.UseFault tolerantExceptionStrategies();

9. 监控和日志

为了监控和调试,可以配置日志记录和监控工具。MassTransit 支持多种日志记录框架,如 Serilog 或 NLog。

10. 安全性和认证

在生产环境中,确保 RabbitMQ 连接是安全的,使用合适的认证和授权机制。可以配置 TLS 和其他安全选项。

通过以上步骤,你可以在生产环境中配置和使用 MassTransit,实现请求/响应和事件处理模式,并利用依赖注入进行服务管理和生命周期管理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/497682.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Hackthebox 中英 Write-Up】Manipulating a CRUD API | 操控 CRUD API:一步步提取 Flag

Objective | 目标 This challenge demonstrates how to interact with a CRUD API to perform Update, Delete, and Search operations to retrieve the flag. 本次挑战旨在演示如何与 CRUD API 交互&#xff0c;通过执行 更新、删除 和 搜索 操作来获取 Flag。 操控 CRUD AP…

【OpenGL ES】GLSL基础语法

1 前言 本文将介绍 GLSL 中数据类型、数组、结构体、宏、运算符、向量运算、矩阵运算、函数、流程控制、精度限定符、变量限定符&#xff08;in、out、inout&#xff09;、函数参数限定符等内容&#xff0c;另外提供了一个 include 工具&#xff0c;方便多文件管理 glsl 代码&a…

【Compose multiplatform教程18】多平台资源的设置和配置

要正确配置项目以使用多平台资源&#xff0c;请执行以下操作&#xff1a; 添加库依赖项。 为每种资源创建必要的目录。 为限定资源创建其他目录&#xff08;例如&#xff0c;深色 UI 主题或本地化字符串的不同图像&#xff09;。 依赖项和目录设置 要访问多平台项目中的资源…

线索二叉树的实现(c语言)

一、前言&#xff1a;什么是二叉树的线索化&#xff1f;&#xff08;为什么要有二叉树的线索化&#xff1f;&#xff09; 通过前面内容的学习&#xff0c;我们知道了二叉树的存储结构其实是通过二叉链表的方式实现的。但二叉链表由于每个结点均有左右孩子域&#xff0c;这使得…

通过python对excel进行数据分析和可视化

import pandas as pd import matplotlib.pyplot as plt import seaborn as snsfile_path "C:\\Users\\86138\\Desktop\\book_list-计算机-机器学习-linux-android-数据库-互联网.xlsx" data pd.read_excel(file_path)need_data data[[书名, 评分, 评价人数]].copy…

考研互学互助系统|Java|SSM|VUE| 前后端分离

【技术栈】 1⃣️&#xff1a;架构: B/S、MVC 2⃣️&#xff1a;系统环境&#xff1a;Windowsh/Mac 3⃣️&#xff1a;开发环境&#xff1a;IDEA、JDK1.8、Maven、Mysql5.7 4⃣️&#xff1a;技术栈&#xff1a;Java、Mysql、SSM、Mybatis-Plus、VUE、jquery,html 5⃣️数据库可…

SpringCloud整合skywalking实现链路追踪和日志采集

1.部署skywalking https://blog.csdn.net/qq_40942490/article/details/144701194 2.添加依赖 <!-- 日志采集 --><dependency><groupId>org.apache.skywalking</groupId><artifactId>apm-toolkit-logback-1.x</artifactId><version&g…

【Spring】 Bean 注入 HttpServletRequest 能保证线程安全的原理

文章目录 前言1. 图示2. 源码坐标后记 前言 今天看了一段老业务代码&#xff0c;HttpServletRequest 被注入后直接用于业务逻辑。 好奇Spring是如何解决线程安全问题。 Controller public class TestController {ResourceHttpServletRequest request;ResponseBodyGetMapping(…

0基础学前端-----CSS DAY11

0基础学前端-----CSS DAY11 视频参考&#xff1a;B站Pink老师 今天是CSS学习的第十一天&#xff0c;今天开始的笔记对应Pink老师课程中的CSS第六天的内容。 本节重点&#xff1a;CSS定位 本章目录 0基础学前端-----CSS DAY11本节目标1. 定位1.1 为什么需要定位1.2 定位组成1.…

frameworks 之 WMS添加窗口流程

frameworks 之 触摸事件窗口查找 1.获取WindowManager对象2.客户端添加view3. 服务端添加view (NO_SURFACE)4.重新布局 (DRAW_PENDING)4.1 创建 SurfaceControl 5.通知绘制 (COMMIT_DRAW_PENDING&#xff0c; READY_TO_SHOW&#xff0c; HAS_DRAWN)5. 1 布局测量和刷新 6.总结 …

ARM单片机定时器

定时器分类 定时器资源。基于GD32F103

【Java】IO流练习

IO流练习 题干&#xff1a; 根据指定要求&#xff0c;完成电话记录、 注册、登录 注册 题干&#xff1a; 完成【注册】功能&#xff1a; 要求&#xff1a; 用户输入用户名、密码存入users.txt文件中 若users.txt文件不存在&#xff0c;创建该文件若users.txt文件存在 输入…

Windows API Set:那些“只存在但不被使用“的DLL

API Set 是什么&#xff1f; 想象一下&#xff0c;Windows就像一个大型图书馆&#xff0c;而API Set就是这个图书馆的索引系统。但这个索引系统非常特别&#xff1a;它是直接内置在Windows加载器中的"虚拟目录"。 // 一个典型的API Set映射示例 api-ms-win-core-mem…

小程序配置文件 —— 12 全局配置 - pages配置

全局配置 - pages配置 在根目录下的 app.json 文件中有一个 pages 字段&#xff0c;这里我们介绍一下 pages 字段的具体用法&#xff1b; pages 字段&#xff1a;用来指定小程序由哪些页面组成&#xff0c;用来让小程序知道由哪些页面组成以及页面定义在哪个目录&#xff0c;…

正则表达式 - 使用总结

以下列出了一些常用正则表达式的使用总结&#xff1a; 匹配基本字符 使用普通字符匹配&#xff1a;普通字符&#xff08;如字母、数字、符号&#xff09;在正则表达式中表示自身&#xff0c;例如匹配 "runoob"&#xff1a;/runoob/。 使用元字符 . 匹配任意字符&…

OpenCV计算机视觉 03 椒盐噪声的添加与常见的平滑处理方式(均值、方框、高斯、中值)

上一篇文章&#xff1a;OpenCV计算机视觉 02 图片修改 图像运算 边缘填充 阈值处理 目录 添加椒盐噪声 图像平滑常见处理方式 均值滤波 (blur) 方框滤波 (boxFilter) ​高斯滤波 (GaussianBlur) 中值滤波 (medianBlur) 添加椒盐噪声 def add_peppersalt_noise(image, n…

告别 $arr[0]: PHP 和 Laravel 中更优雅的数组处理方式

你是否曾经历过这样的惊魂时刻&#xff1a;线上代码突然崩溃&#xff0c;只因访问了一个不存在的数组元素&#xff1f;或者更糟的是&#xff0c;应用开始疯狂抛出错误&#xff0c;而你却毫无头绪&#xff1f;这一切的罪魁祸首可能就是看似人畜无害的硬编码数组索引&#xff0c;…

uniapp 微信小程序 数据空白展示组件

效果图 html <template><view class"nodata"><view class""><image class"nodataimg":src"$publicfun.locaAndHttp()?localUrl:$publicfun.httpUrlImg(httUrl)"mode"aspectFit"></image>&l…

【开源免费】基于SpringBoot+Vue.JS网上摄影工作室系统(JAVA毕业设计)

本文项目编号 T 103 &#xff0c;文末自助获取源码 \color{red}{T103&#xff0c;文末自助获取源码} T103&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

SOEM裸机移植

源码地址 https://gitee.com/rathon/apollof429-v2.git 还有一些移植细节可以参考我之前写的一些博客 硬件平台&#xff1a; 正点原子APOLLOF429V2开发板 开发环境 stm32cubemx6.2.0版本&#xff0c;用的库为STM32Cube_FW_F4_V1.26.2&#xff0c;开发软件为STM32cubeide …