Kafka+Log4j实现日志集中管理(3)

package com.demo.kafka;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
public class MyProducer {
    private static final String TOPIC = "kafka";
    private static final String CONTENT = "This is a single message";
    private static final String BROKER_LIST = "localhost:9092";
    private static final String SERIALIZER_CLASS = "kafka.serializer.StringEncoder";
   
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("serializer.class", SERIALIZER_CLASS);
        props.put("metadata.broker.list", BROKER_LIST);
       
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
 
        //Send one message.
        KeyedMessage<String, String> message =
            new KeyedMessage<String, String>(TOPIC, CONTENT);
        producer.send(message);
       
        //Send multiple messages.
        List<KeyedMessage<String,String>> messages =
            new ArrayList<KeyedMessage<String, String>>();
        for (int i = 0; i < 5; i++) {
            messages.add(new KeyedMessage<String, String>
                (TOPIC, "Multiple message at a time. " + i));
        }
        producer.send(messages);
    }
}

到这里,代码就结束了。
 
第三部分 运行与验证

先运行MyConsumer,使其处于监听状态。同时,还可以启动Kafka自带的ConsoleConsumer来验证是否跟MyConsumer的结果一致。最后运行App.java。

先来看看MyConsumer的输出:

Kafka+Log4j实现日志集中管理

再来看看ConsoleConsumer的输出:

Kafka+Log4j实现日志集中管理

可以看到,尽管发往Kafka的消息去往了不同的地方,但是内容是一样的,而且一条也不少。最后再来看看Kafka的日志。

我们知道,Topic为kafka的消息有4个partion,从之前的截图可知这4个partion均匀分布在4个kafka节点上,于是我对每一个partion随机选取一个节点查看了日志内容。

上图中黄色选中部分依次代表在server0上查看partion0,在server1上查看partion1,以此类推。

而红色部分是日志内容,由于在创建Topic时准备将20条日志分成4个区存储,可以很清楚的看到,这20条日志确实是很均匀的存储在了几个partion上。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/16a5d2934ecb0a895e73c95eac86364f.html