if (logger.isDebugEnabled()) {
logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
+ eventBody);
logger.debug("event #{}", processedEvents);
}
// create a message and add to buffer
ProducerData<String, String> data = new ProducerData<String, String>
(eventTopic, eventBody);
messageList.add(data);
}
// publish batch and commit.
if (processedEvents > 0) {
long startTime = System.nanoTime();
long endTime = System.nanoTime();
}
}
transaction.commit();
} catch (Exception ex) {
String errorMsg = "Failed to publish events";
logger.error("Failed to publish events", ex);
status = Status.BACKOFF;
if (transaction != null) {
try {
transaction.rollback();
} catch (Exception e) {
logger.error("Transaction rollback failed", e);
throw Throwables.propagate(e);
}
}
throw new EventDeliveryException(errorMsg, ex);
} finally {
if (transaction != null) {
transaction.close();
}
}
return status;
下一步,修改flume配置文件,将其中sink部分的配置改成kafka sink,如:
producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
producer.sinks.r.brokerList = bigdata-node00:9092
producer.sinks.r.requiredAcks = 1
producer.sinks.r.batchSize = 100
#producer.sinks.r.kafka.producer.type=async
#producer.sinks.r.kafka.customer.encoding=UTF-8
producer.sinks.r.topic = testFlume1
type指向kafkasink所在的完整路径
下面的参数都是kafka的一系列参数,最重要的是brokerList和topic参数
现在重新启动flume,就可以在kafka的对应topic下查看到对应的日志