组件安装
Install-Package Apache.NMS.ActiveMQ
.NET 组件连接 activemq
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.Util;
using Microsoft.Extensions.Configuration;namespace GraphqlApp.BackendAPI.Services
{public class MqService{private string _mqUrl = string.Empty;private string _Q_Biz1 = string.Empty;public string Q_Biz1{get {return _Q_Biz1;}}public MqService(IConfiguration configuration){_mqUrl = configuration.GetValue<string>("Mq:MqUrl");_Q_Biz1 = configuration.GetValue<string>("Mq:Q_Biz1");}private IConnection CreateConnection(){Uri _uri = new Uri(String.Concat($"activemq:failover:({_mqUrl})"));IConnectionFactory factory = new ConnectionFactory(_uri);var result = factory.CreateConnection();result.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;return result;}public void Product(string queuesName, string msg){Task.Run(() =>{using (IConnection _conn = CreateConnection()){using (Apache.NMS.ISession _session = _conn.CreateSession()){IDestination _destination = SessionUtil.GetDestination(_session, queuesName);using (IMessageProducer producer = _session.CreateProducer(_destination)){ITextMessage request = _session.CreateTextMessage(msg);producer.Send(request);}}}});}public void Consume(string queuesName, Func<string, int> func){Task.Run(() =>{using (IConnection conn = this.CreateConnection()){ using (ISession session = conn.CreateSession(AcknowledgementMode.ClientAcknowledge)){ conn.Start();IDestination destination = SessionUtil.GetDestination(session, queuesName);using (IMessageConsumer consumer = session.CreateConsumer(destination)){consumer.Listener += (IMessage message) =>{ITextMessage msg = (ITextMessage)message;Console.WriteLine("从MQ接收到消息:" + msg.Text);var funcResult = func(msg.Text);if (funcResult > 0)msg.Acknowledge();};Console.ReadLine();}}}});//Console.ReadLine();}}
}