在上一篇博文里我们介绍了通过gRPC实现JDBC数据库的streaming,这篇我们介绍关于cassandra的streaming实现方式。如果我们需要从一个未部署cassandra的节点或终端上读取cassandra数据,可以用gRPC来搭建一个数据桥梁来连接这两端。这时cassandra这端就是gRPC-Server端,由它提供cassandra的数据服务。
在前面sdp系列讨论里我们已经实现了Cassandra-Engine。它的运作原理还是通过某种Context把指令提交给cassandra去执行。我们先设计一个创建库表的例子。CQL语句和Cassandra-Engine程序代码如下,这是客户端部分:
val dropCQL = "DROP TABLE IF EXISTS testdb.AQMRPT" val createCQL =""" CREATE TABLE testdb.AQMRPT ( rowid bigint primary key, measureid bigint, statename text, countyname text, reportyear int, value int, created timestamp )""" val cqlddl = CQLUpdate(statements = Seq(dropCQL,createCQL)) def createTbl: Source[CQLResult,NotUsed] = { log.info(s"running createTbl ...") Source .single(cqlddl) .via(stub.runDDL) }