Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core

前置条件:
Dapr运用》
Dapr 运用之 Java gRPC 调用篇》
《Dapr 运用之集成 Asp.Net Core Grpc 调用篇》

搭建 RabbitMQ

Docker 搭建 RabbitMQ 服务

docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

创建 rabbiqmq.yaml

apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: messagebus spec: type: pubsub.rabbitmq metadata: - name: host value: "amqp://localhost:5672" # Required. Example: "rabbitmq.default.svc.cluster.local:5672" - name: consumerID value: "61415901178272324029" # Required. Any unique ID. Example: "myConsumerID" - name: durable value: "true" # Optional. Default: "false" - name: deletedWhenUnused value: "false" # Optional. Default: "false" - name: autoAck value: "false" # Optional. Default: "false" - name: deliveryMode value: "2" # Optional. Default: "0". Values between 0 - 2. - name: requeueInFailure value: "true" # Optional. Default: "false".

改造 StorageService.Api

目的:把 StorageService 从 Grpc 客户端改造为 Grpc 服务端,并 Sub Storage.Reduce 主题,完成减库存操作。

删除 Storage 中无用的代码 StorageController.cs

修改 Program.cs 中的 CreateHostBuilder 代码为

public static IHostBuilder CreateHostBuilder(string[] args) { return Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { webBuilder.ConfigureKestrel(options => { options.Listen(IPAddress.Loopback, 5003, listenOptions => { listenOptions.Protocols = HttpProtocols.Http2; }); }); webBuilder.UseStartup<Startup>(); }); }

添加 DaprClientService

public sealed class DaprClientService : DaprClient.DaprClientBase { public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context) { var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope(); topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce"); return Task.FromResult(topicSubscriptionsEnvelope); } }

Dapr 运行时将调用此方法获取 StorageServcie 关注的主题列表

修改 Startup.cs

/// <summary> /// This method gets called by the runtime. Use this method to add services to the container. /// </summary> /// <param>Services.</param> public void ConfigureServices(IServiceCollection services) { services.AddGrpc(); services.AddDbContextPool<StorageContext>(options => { options.UseMySql(Configuration.GetConnectionString("MysqlConnection")); }); } /// <summary> /// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. /// </summary> /// <param>app.</param> /// <param>env.</param> public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseRouting(); app.UseEndpoints(endpoints => { endpoints.MapSubscribeHandler(); endpoints.MapGrpcService<DaprClientService>(); }); }

复制 rabbimq.yaml 文件到 components 文件夹中,删除 redis_messagebus.yaml 文件

启动 StorageService 服务

dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run

使用 Java 开发一个 Order 服务端,Order 服务提供的功能为

下单

查看订单详情

获取订单列表

在当前上下文中着重处理的是下单功能,以及下单成功后 Java 服务端将发布一个事件到 Storage.Reduce 主题,即减少库存。

创建 CreateOrder.proto 文件

syntax = "proto3"; package daprexamples; option java_outer_classname = "CreateOrderProtos"; option java_package = "generate.protos"; service OrderService { rpc CreateOrder (CreateOrderRequest) returns (CreateOrderResponse); rpc RetrieveOrder(RetrieveOrderRequest) returns(RetrieveOrderResponse); rpc GetOrderList(GetOrderListRequest) returns(GetOrderListResponse); } message CreateOrderRequest { string ProductID = 1; //Product ID int32 Amount=2; //Product Amount string CustomerID=3; //Customer ID } message CreateOrderResponse { bool Succeed = 1; //Create Order Result,true:success,false:fail } message RetrieveOrderRequest{ string OrderID=1; } message RetrieveOrderResponse{ Order Order=1; } message GetOrderListRequest{ string CustomerID=1; } message GetOrderListResponse{ repeated Order Orders=1; } message Order{ string ID=1; string ProductID=2; int32 Amount=3; string CustomerID=4; }

使用 protoc 生成 Java 代码

protoc -I=C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples --java_out=C:\Users\JR\DaprDemos\java\examples\src\main\java C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples\CreateOrder.proto

引用 MyBatis 做为 Mapper 工具

修改 HelloWorldService.java 文件,提取 GrpcHelloWorldDaprService.java 到单独的包中,在此文件中添加 createOrder() 、 getOrderList() 、 retrieveOrder() 三个函数的实现

复制 rabbimq.yaml 文件到 components 文件夹中,删除原有 redis_messagebus.yaml 文件

启动 OrderService 服务

dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"

创建 Golang Grpc 客户端,该客户端需要完成创建订单 Grpc 调用,订单创建成功发布扣除库存事件

引用 CreateOrder.proto 文件,并生成 CreateOrder.pb.go 文件

如未安装 protoc-gen-gogo ,通过一下命令获取并安装

go get github.com/gogo/protobuf/gogoproto

安装 protoc-gen-gogo

go install github.com/gogo/protobuf/gogoproto

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wspxjx.html