InitKafka.go
package kafka var ( kafkaClient *Client ) func InitKafka() { var err error var config = Config{ Host: []string{"kafka:9092"}, } kafkaClient, err = NewClient(config) if err != nil { panic(err) } } func GetClient() *Client { return kafkaClient }Producer.go
package kafka import ( "errors" "github.com/Shopify/sarama" ) type Client struct { sarama.AsyncProducer msgPool chan *sarama.ProducerMessage } type Config struct { Host []string `json:"host"` ReturnSuccess bool `json:"return_success"` ReturnErrors bool `json:"return_errors"` } func NewClient(cfg Config) (*Client, error) { // create client var err error c := &Client{ msgPool: make(chan *sarama.ProducerMessage, 2000), } config := sarama.NewConfig() config.Producer.Return.Errors = cfg.ReturnErrors config.Producer.Return.Successes = cfg.ReturnSuccess config.Version = sarama.V2_0_0_0 c.AsyncProducer, err = sarama.NewAsyncProducer(cfg.Host, config) if err != nil { return nil, err } return c, nil } // run func (c *Client) Run() { for { select { case msg := <-c.msgPool: c.Input() <- msg logger.Info("%+v", msg) } } } // send msg func (c *Client) Send(topic string, msg []byte) error { if topic == "" { return errors.New("kafka producer send msg topic empty") } kafkaMsg := &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(msg), } c.msgPool <- kafkaMsg return nil }生产者初始化:
// kafka init kafka.InitKafka() go kafka.GetClient().Run() 消费者consumer.go
package kafka_consumer import ( "context" "github.com/Shopify/sarama" "os" "os/signal" "sync" "syscall" ) // Consumer represents a Sarama consumer group consumer type Consumer struct { ready chan bool } func (c *Consumer) Setup(session sarama.ConsumerGroupSession) error { //panic("implement me") return nil } func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error { //panic("implement me") return nil } func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { logger.Info("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic) session.MarkMessage(message, "") c.Handler(message.Topic, message.Value) } return nil } func (c *Consumer) Handler(topic string, msg []byte) { switch topic { case conscom.KafkaTopicGiftOrder: GiftOrder(topic, msg) case conscom.KafkaTopicFollow: UserFollow(topic, msg) } } func ConsumeInit(topics []string, groupID string) { consumer := Consumer{ ready: make(chan bool), } brokerList := []string{"kafka:9092"} config := sarama.NewConfig() config.Version = sarama.V1_0_0_0 ctx, cancel := context.WithCancel(context.Background()) client, err := sarama.NewConsumerGroup(brokerList, groupID, config) if err != nil { log.Printf("kafka consumer err %v", err) return } wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() for { // server-side rebalance happens, the consumer session will need to be if err := client.Consume(ctx, topics, &consumer); err != nil { log.Printf("kafka consumer: %v", err) } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { return } consumer.ready = make(chan bool) } }() sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-ctx.Done(): log.Printf("kafka consume gift terminating: context cancelled") case <-sigterm: log.Printf("kafka consume gift terminating: via signal") } cancel() wg.Wait() if err = client.Close(); err != nil { log.Printf("kafka consume gift Error closing client: %v", err) } }消费者初始化:
// kafka consumer go kafka_consumer.ConsumeInit([]string{"topicA", "topicB", "group-name") 参考《深入理解Kafka:核心设计与实践原理》作者:朱忠华
https://github.com/Shopify/sarama
https://crossoverjie.top/2018/11/20/kafka/kafka-consumer/