public static void main(String[] args) {
final MetaClientConfig metaClientConfig = new MetaClientConfig();
final ZKConfig zkConfig = new ZKConfig();
zkConfig.zkConnect = "10.168.140.48:2181";
metaClientConfig.setZkConfig(zkConfig);
MessageSessionFactory sessionFactory = null;
try {
sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
} catch (MetaClientException e) {
e.printStackTrace();
}
MessageProducer producer = sessionFactory.createProducer();
final String topic = "test";
producer.publish(topic);
BufferedReader reader = new BufferedReader(new InputStreamReader(
System.in));
String line = "qiujinyong";
try {
while ((line = reader.readLine()) != null) {
SendResult sendResult = producer.sendMessage(new Message(topic,
line.getBytes()));
if (!sendResult.isSuccess()) {
System.err.println("Send message failed,error message:"
+ sendResult.getErrorMessage());
} else {
System.out.println("Send message successfully,sent to "
+ sendResult.getPartition());
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MetaClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
打包test.jar后,传服务器上 java -cp test.jar com.test.metaq.Products 命令行输入message
打包test.jar后,传服务器上 java -cp test.jar com.test.metaq.AsyncConsum 命令行会接收到message
ZooKeeper 的详细介绍:请点这里
ZooKeeper 的下载地址:请点这里
相关阅读:
分布式服务框架 ZooKeeper -- 管理分布式环境中的数据