Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core (2)

根据 proto 文件生成代码

protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\CreateOrder.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\

客户端代码,创建订单

... response, err := client.InvokeService(context.Background(), &pb.InvokeServiceEnvelope{ Id: "OrderService", Data: createOrderRequestData, Method: "createOrder", }) if err != nil { fmt.Println(err) return } ...

添加 DataToPublish.proto 文件,此文件作为事件发布数据结构

syntax = "proto3"; package daprexamples; option java_outer_classname = "DataToPublishProtos"; option java_package = "generate.protos"; message StorageReduceData { string ProductID = 1; int32 Amount=2; }

生成 DataToPublish 代码

protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\DataToPublish.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\

修改 main.go 代码,根据 createOrder 结果判断是否要发布信息到消息队列

... createOrderResponse := &daprexamples.CreateOrderResponse{} if err := proto.Unmarshal(response.Data.Value, createOrderResponse); err != nil { fmt.Println(err) return } fmt.Println(createOrderResponse.Succeed) if !createOrderResponse.Succeed { //下单失败 return } storageReduceData := &daprexamples.StorageReduceData{ ProductID: createOrderRequest.ProductID, Amount: createOrderRequest.Amount, } storageReduceDataData, err := jsoniter.ConfigFastest.Marshal(storageReduceData) //ptypes.MarshalAny(storageReduceData) if err != nil { fmt.Println(err) return } _, err = client.PublishEvent(context.Background(), &pb.PublishEventEnvelope{ Topic: "Storage.Reduce", Data: &any.Any{Value: storageReduceDataData}, }) fmt.Println(storageReduceDataData) if err != nil { fmt.Println(err) } else { fmt.Println("Published message!") } ...

注意: 发送数据前,使用 jsoniter 转换数据为 json 字符串,原因是如果直接传输 Grpc 流,当前版本(0.3.x) Dapr runtime 打包数据时使用 Json 打包,解包使用 String ,导致数据不一致。

复制 rabbimq.yaml 文件到 components 文件夹,删除原有 redis_messagebus.yaml 文件

启动 golang Grpc 客户端

dapr run --app-id client go run main.go

输出

== APP == true == APP == Published message!

RabbitMQ

在浏览器中输入 :15672/ ,账号和密码均为 guest

查看 Connections ,有3个连接

这个3个连接来自配置了 messagebus.yaml 组件的三个服务

查看 Exchanges

Name Type Features Message rate in Message rate out (AMQP default) direct D Storage.Reduce fanout D amq.direct direct D amq.fanout fanout D ...

着重看 Storage.Reduce ,可以看出 Dapr 运行时创建了一个 fanout 类型的 Exchange ,这表明该 Exhange 中的数据是广播的。

查看 Queues

Dapr 运行时创建了 storageService-Storage.Reduce ,该 Queue 绑定了 Storage.Reduce Exchange ,所以可以收到 Storage.Reduce 的广播数据。

DotNet Core StorageService.Api 改造以完成 Sub 事件

打开 DaprClientService.cs 文件,更改内容为

public sealed class DaprClientService : DaprClient.DaprClientBase { private readonly StorageContext _storageContext; public DaprClientService(StorageContext storageContext) { _storageContext = storageContext; } public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context) { var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope(); topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce"); return Task.FromResult(topicSubscriptionsEnvelope); } public override async Task<Empty> OnTopicEvent(CloudEventEnvelope request, ServerCallContext context) { if (request.Topic.Equals("Storage.Reduce")) { StorageReduceData storageReduceData = StorageReduceData.Parser.ParseJson(request.Data.Value.ToStringUtf8()); Console.WriteLine("ProductID:" + storageReduceData.ProductID); Console.WriteLine("Amount:" + storageReduceData.Amount); await HandlerStorageReduce(storageReduceData); } return new Empty(); } private async Task HandlerStorageReduce(StorageReduceData storageReduceData) { Guid productID = Guid.Parse(storageReduceData.ProductID); Storage storageFromDb = await _storageContext.Storage.FirstOrDefaultAsync(q => q.ProductID.Equals(productID)); if (storageFromDb == null) { return; } if (storageFromDb.Amount < storageReduceData.Amount) { return; } storageFromDb.Amount -= storageReduceData.Amount; Console.WriteLine(storageFromDb.Amount); await _storageContext.SaveChangesAsync(); }

说明

添加 GetTopicSubscriptions() 将完成对主题的关注

当应用停止时,RabbitMQ 中的 Queue 自动删除

添加 OnTopicEvent() 重写,此方法将完成对 Sub 主题的事件处理

HandlerStorageReduce 用于减少库存

启动 DotNet Core StorageService.Api Grpc 服务,启动 Java OrderService Grpc 服务,启动 Go Grpc 客户端

DotNet Core

dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run

Java

dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"

go

dapr run --app-id client go run main.go

go grpc 输出为

== APP == true == APP == Published message!

查看 MySql Storage 数据库,对应产品库存减少 20

至此,通过 Dapr runtime 完成了 Go 和 Java 之间的 Grpc 调用,并通过 RabbitMQ 组件完成了 Pub/Sub

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wspxjx.html