使用flume+kafka+storm构建实时日志分析系统(2)

if (logger.isDebugEnabled()) {
        logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
        + eventBody);
        logger.debug("event #{}", processedEvents);
        }

// create a message and add to buffer
        ProducerData<String, String> data = new ProducerData<String, String>
        (eventTopic, eventBody);
        messageList.add(data);
            }
           
            // publish batch and commit.
    if (processedEvents > 0) {
    long startTime = System.nanoTime();
    long endTime = System.nanoTime();
    }
        }
       
        transaction.commit();
    } catch (Exception ex) {
        String errorMsg = "Failed to publish events";
        logger.error("Failed to publish events", ex);
        status = Status.BACKOFF;
        if (transaction != null) {
          try {
            transaction.rollback();
          } catch (Exception e) {
            logger.error("Transaction rollback failed", e);
            throw Throwables.propagate(e);
          }
        }
        throw new EventDeliveryException(errorMsg, ex);
      } finally {
        if (transaction != null) {
          transaction.close();
        }
      }
   
    return status;
    下一步,修改flume配置文件,将其中sink部分的配置改成kafka sink,如:
   

producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink

producer.sinks.r.brokerList = bigdata-node00:9092
producer.sinks.r.requiredAcks = 1
producer.sinks.r.batchSize = 100
#producer.sinks.r.kafka.producer.type=async
#producer.sinks.r.kafka.customer.encoding=UTF-8
producer.sinks.r.topic = testFlume1
    type指向kafkasink所在的完整路径
    下面的参数都是kafka的一系列参数,最重要的是brokerList和topic参数

现在重新启动flume,就可以在kafka的对应topic下查看到对应的日志

分布式发布订阅消息系统 Kafka 架构设计

Apache Kafka 代码实例

Apache Kafka 教程笔记

Apache kafka原理与特性(0.8V) 

Kafka部署与代码实例 

Kafka介绍和集群环境搭建 

Kafka 的详细介绍请点这里
Kafka 的下载地址请点这里

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/09c1f25fbdc95d573ef4d91f3dd2e40f.html