为了分析在插入海量数据到Cassandra集群或者Oracle时的表现,也就是插入速率,我们用Java程序对插入数据的用时进行了采样,最终用JFreeChart把采样结果绘制出来了。
为了公平起见,我们做了以下处理:
1.所有的循环变量都放在了循环外面
2.对于Cassandra的replication-factor设置为1,这样插入数据不需要插入额外的备份。
3.对于Oracle我们用预编译语句,这样插入操作的执行计划可以重用。
4.所有的测试都在周末进行,这样不可能有其他人去干扰这些服务器。
5.这些机器上运行的其他进程都被我kill掉了,这样保证CPU,内存的专用性。
6.在用java代码插入Cassandra记录时候,我采用了thrift API, 因为它的效率比Hector API高。
以下是实验(分两部分,一是采样部分,二是数据分析部分)
Part 1:采样:
Cassandra的采样:
我们这里依然用循环插入50W条记录,不同的是,在循环的开始和循环每10000条记录时,我们把时间戳记录在List中,最终把这个List写入文本文件(cassandra_input_sample_data.txt):
package com.charles.cassandra.demo; import java.io.File; import java.io.FileWriter; import java.util.ArrayList; import java.util.List; import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.Column; import org.apache.cassandra.thrift.ColumnParent; import org.apache.cassandra.thrift.ConsistencyLevel; import org.apache.cassandra.thrift.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import com.charles.cassandra.util.CassandraOperationUtil; public class CassandraClusterStressTest { public static void main(String[] args) throws Exception { //包装好的socket TTransport tr = new TFramedTransport(new TSocket("192.168.129.34",9160)); TProtocol proto = new TBinaryProtocol(tr); Cassandra.Client client = new Cassandra.Client(proto); tr.open(); if(!tr.isOpen()) { System.out.println("无法连接到服务器!"); return; } System.out.println("开始压力测试,我们插入50W条数据到2节点集群中"); System.out.println("..."); //标记开始时间 long startTime = System.currentTimeMillis(); client.set_keyspace("Charles_Stress_Test2");//使用Charles_Stress_Test keyspace ColumnParent parent = new ColumnParent("student");//column family /* * 这里我们插入50万条数据到Student内 * 每条数据包括id和name */ String key_user_id = "a"; String k; long timestamp; Column idColumn =null; Column nameColumn=null; //这个sampleData代表了每插入1W条记录到Cassandra集群的用时毫秒的数据样本 List<Integer> sampleData = new ArrayList<Integer>(51); for(int i = 0;i < 500000;i++) { k = key_user_id + i;//row key timestamp = System.currentTimeMillis();//时间戳 //每行的第一个字段(id字段) idColumn = new Column(CassandraOperationUtil.stringToByteBuffer("id"));//字段名 idColumn.setValue(CassandraOperationUtil.stringToByteBuffer(i + ""));//字段值 idColumn.setTimestamp(timestamp);//时间戳 client.insert( CassandraOperationUtil.stringToByteBuffer(k), parent, idColumn, ConsistencyLevel.ONE); //每行的第二个字段(name字段) nameColumn = new Column(CassandraOperationUtil.stringToByteBuffer("name")); nameColumn.setValue(CassandraOperationUtil.stringToByteBuffer("student" + i)); nameColumn.setTimestamp(timestamp); client.insert( CassandraOperationUtil.stringToByteBuffer(k), parent, nameColumn, ConsistencyLevel.ONE); //判断是否这是起始记录(用于标记起始时间戳)和第N 万条记录(第N万条记录的时间戳) if( (i==0) || ( (i+1)%10000==0)){ sampleData.add((int)(timestamp)); } } //标记结束时间 long endTime = System.currentTimeMillis(); //标记一共用时 long elapsedTime = endTime-startTime; System.out.println("压力测试完毕,用时: "+elapsedTime+" 毫秒"); //关闭连接 tr.close(); //压力测试结束后,我们把所有的样本数据写入文件中等待处理 FileWriter fw = new FileWriter(new File("cassandra_insert_sample_data.txt")); for(int j=0;j<sampleData.size();j++){ fw.write(sampleData.get(j)+"\n"); } fw.flush(); fw.close(); } }最终50W条记录插入完毕:控制台显示:
而且我们打开文本文件确定这些时间戳的样本都被记录了: