首页
/
每日頭條
/
生活
/
雲原生架構師基礎筆記
雲原生架構師基礎筆記
更新时间:2024-04-28 02:06:53
2.6.7 RabbitMQ -- Masstransit 詳解
  • Consumer 消費者
  • Producer 生産者
  • Request-Response 請求-響應
Consumer 消費者

在 MassTransit 中,一個消費者可以消費一種或多種消息

消費者的類型包括:普通消費者,saga,saga 狀态機,路由活動(分布式追蹤),處理器 handlers,工作消費者 job comsumers

  • Consumer
  • Instance
  • Handler
  • Others
Consumer

public class Program { public static async Task Main() { var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.ReceiveEndpoint("order-service", e => { e.Consumer<SubmitOrderConsumer>(); }); }); } }

繼承 IConsumer,實現 Consume 方法

class SubmitOrderConsumer : IConsumer<SubmitOrder> { public async Task Consume(ConsumeContext<SubmitOrder> context) { await context.Publish<OrderSubmitted>(new { context.Message.OrderId }); } }

三個原則:

  • 擁抱 The Hollywood Principle, which states, "Dont't call us, we'll call you."
  • Consume 方法是一個被等待的方法,在執行中時其他消費者無法接收到這個消息,當這個方法完成的時候,消息被 ack,并且從隊列中移除
  • Task 方法異常會導緻消息觸發 retry,如果沒有配置重試,消息将被投遞到失敗隊列
Instance

public class Program { public static async Task Main() { var submitOrderConsumer = new SubmitOrderConsumer(); var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.ReceiveEndpoint("order-service", e => { e.Instance(submitOrderConsumer); }); }); } }

所有接收到的消息都由一個消費者來實例來處理(請确保這個消費者類是線程安全)

Consumer 每次接收到消息都會 new 一個實例

Handler

public class Program { public static async Task Main() { var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.ReceiveEndpoint("order-service", e => { e.Handler<SubmitOrder>(async context => { await Console.Out.WriteLineAsync($"Submit Order Received: {context.Message.OrderId}"); }); }); }); } }

通過一個委托 Lambda 方法,來消費消息

Others
  • Saga<>
  • StateMachineSaga<>
Producer 生産者

消息的生産可以通過兩種方式産生:發送和發布

發送的時候需要指定一個具體的地址 DestinationAddress,發布的時候消息會被廣播給所有訂閱了這個消息類型的消費者

基于這兩種規則,消息被定義為:命令 command 和事件 event

  • send
  • publish
send

可以調用以下對象的 send 方法來發送 command:

  • ConsumeContext (在 Consumer 的 Consumer 方法參數中傳遞)
  • ISendEndpointProvider(可以從 DI 中獲取)
  • IBusControl(最頂層的控制對象,用來啟動和停止 masstransit 的控制器)
ConsumeContext

public class SubmitOrderConsumer : IConsumer<SubmitOrder> { private readonly IOrderSubmitter _orderSubmitter; public SubmitOrderConsumer(IOrderSubmitter submitter) => _orderSubmitter = submitter; public async Task Consume(IConsumeContext<SubmitOrder> context) { await _orderSubmitter.Process(context.Message); await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow)); } }

ISendEndpointProvider

public async Task SendOrder(ISendEndpointProvider sendEndpointProvider) { var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress); await endpoint.Send(new SubmitOrder { OrderId = "123" }); }

publish
  • 發送地址
  • 短地址
  • Convention Map
發送地址
  • rabbitmq://localhost/input-queue
  • rabbitmq://localhost/input-queue?durable=false
短地址
  • GetSendEndpoint(new Uri("queue:input-queue"))

雲原生架構師基礎筆記(雲原生架構師訓練營)1

Convention Map

在配置文件中指定 map 規則

EndpointConvention.Map<StartDelivery>(new Uri(ConfigurationManager.AppSettings["deliveryServiceQueue"]));

直接發送

public class SubmitOrderConsumer : IConsumer<SubmitOrder> { private readonly IOrderSubmitter _orderSubmitter; public SubmitOrderConsumer(IOrderSubmitter submitter) => _orderSubmitter = submitter; public async Task Consume(IConsumeContext<SubmitOrder> context) { await _orderSubmitter.Process(context.Message); await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow)); } }

可以調用以下對象的 publish 方法來發送 event:

  • ConsumeContext (在 Consumer 的 Consumer 方法參數中傳遞)
  • IPublishEndpoint(可以從 DI 中獲取)
  • IBusControl(最頂層的控制對象,用來啟動和停止 masstransit 的控制器)

IPublishEndpoint

public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint) { await publishEndpoint.Publish<OrderSubmitted>(new { OrderId = "27", OrderDate = DateTime.UtcNow, }); }

Request-Response 請求-響應

Request-Response 模式讓應用程序之間解耦之後,依然采用同步的方式

  • Consumer
  • IClientFactory
  • IRequestClient
  • Send a request
Consumer

public async Task Consume(ConsumeContext<CheckOrderStatus> context) { var order = await _orderRepository.Get(context.Message.OrderId); if (order == null) throw new InvalidOperationException("Order not found"); await context.RespondAsync<OrderStatusResult>(new { OrderId = order.Id, order.Timestamp, order.StatusCode, order.StatusText }); }

需要處理返回類型 OrderStatusResult,異步方式模拟同步,實際上同樣有消息隊列,消費者處理過程

IClientFactory

public interface IClientFactory { IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout); IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout); RequestHandle<T> CreateRequest<T>(T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout); RequestHandle<T> CreateRequest<T>(ConsumeContext context, T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout); }

通過 IBusControl 的 CreateClientFactory 方法可以得到 ClientFactory

IRequestClient

public interface IRequestClient<TRequest> where TRequest : class { RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout); Task<Response<T>> GetResponse<T>(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout); }

RequestClient 可以創建請求,或者直接獲得響應

Send a request

var serviceAddress = new Uri("rabbitmq://localhost/check-order-status"); var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress); var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});

,
Comments
Welcome to tft每日頭條 comments! Please keep conversations courteous and on-topic. To fosterproductive and respectful conversations, you may see comments from our Community Managers.
Sign up to post
Sort by
Show More Comments
推荐阅读
穿越火線手遊荒島特訓攻略槍械
穿越火線手遊荒島特訓攻略槍械
荒島特訓作為CF手遊新出的模式,想要一步成神當然是十分困難的。沒有經曆落地成盒是很難體會到這遊戲的精髓,但我們的目标是成為最後那名存活者。所以今天我們就來看看兩條單排必須要知道的經驗,打開你吃雞的新世界大門。經驗一:确定降落地點飛機上确認飛...
2024-04-28
窗簾紗在裡面還是外面
窗簾紗在裡面還是外面
窗簾紗在裡面還是外面?在安裝這樣的雙層窗簾時,紗簾通常情況下都是裝在裡面也就是靠近窗子的那一邊的因為紗簾裝在外面的話,是比較影響美觀的與屋内整體風格的,裝窗簾的效果也會大打折扣,所以建議最好是裝在裡面,下面我們就來聊聊關于窗簾紗在裡面還是外...
2024-04-28
隐形眼鏡企業法律法規
隐形眼鏡企業法律法規
據國家企業信用信息公示系統顯示,近日,陌森眼鏡關聯公司廈門雅瑞光學有限公司因違法《廣告法》相關規定,新增一條行政處罰記錄。處罰事由為:根據《中華人民共和國廣告法》第五十八條第一款第(十四)項,對該公司處涉案違法廣告費用一倍,即218310元...
2024-04-28
網名簡單有内涵
網名簡單有内涵
網名簡單有内涵?眠意寄給秋一瞬間的回眸,今天小編就來聊一聊關于網名簡單有内涵?接下來我們就一起去研究一下吧!網名簡單有内涵眠意寄給秋一瞬間的回眸仙兒夢也渺渺晚螢花謝為誰悲思歸花獨、蝶雙飛藏在星辰大海半城柳色半聲笛淺墨惜晨清冷南尋别來無恙做你...
2024-04-28
楊廣有沒有殺了楊堅
楊廣有沒有殺了楊堅
大業十四年(618年),以宇文化及為名義共主的骁果軍發動了“江都兵變”,隋朝第二代皇帝隋炀帝楊廣被骁果軍勒死。雖然還有幾個楊姓宗室在隋炀帝楊廣死後擁有皇帝的頭銜,但隋朝在“江都兵變”之後已經實際滅亡,隋炀帝楊廣就成為隋朝的亡國皇帝。不過有人...
2024-04-28
Copyright 2023-2024 - www.tftnews.com All Rights Reserved