RabbitMQ 和 Kafka 简单的性能测试

测试环境:Ubuntu 15.10 64位

cpu:inter core i7-4790 3.60GHZ * 8

内存:16GB

硬盘:ssd 120GB

软件环境:rabbmitmq 3.6.0  kafka0.8.1  (均为单机本机运行)

PS: 测试结果均为单操作测试,即生产的时候没有消费操作

测试结果:

kafka :消费速度: 37,586 /s  生产速度: 448,753 /s

rabbitmq: 消费速度: 20,807 /s  生产速度  16.413 /s

出现问题:

rabbitmq 生产4分钟左右出现队列阻塞,无法继续添加数据,1分钟后恢复,再过大约1分钟又出现此现象并以约1分钟为间隔出现此问题。

rabbitmq 生产对象时有不小的几率(约 1/20)添加队列失败,报出的错误是“tcp链接重置”

其他并无任何问题

结论:

很明显的看出kafka的性能远超rabbitmq。不过这也是理所当然的,毕竟2个消息队列实现的协议是不一样的,处理消息的场景也大有不同。rabbitmq适合处理一些数据严谨的消息,比如说支付消息,社交消息等不能丢失的数据。kafka是批量操作切不报证数据是否能完整的到达消费者端,所以适合一些大量的营销消息的场景。

代码:

kafka:

package main
import (
    "github.com/Shopify/sarama"
    "os"
    "os/signal"
    "sync"
    "log"
    "time"
)


func main() {
    go producer()
//    go consumer()
    time.Sleep(10*time.Minute)
}

func producer()  {
    config :=sarama.NewConfig()
    config.Producer.Return.Successes = true
    proder,err := sarama.NewAsyncProducer([]string{"localhost:9092"},config)
    if err != nil {
        panic(err)
    }

signals :=make(chan  os.Signal,1)
    signal.Notify(signals,os.Interrupt)

var (
        wg                          sync.WaitGroup
        enqueued, successes, errors int
    )

wg.Add(1)
    go func() {
        defer  wg.Done()
        for _=range proder.Successes(){
            successes++
        }
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        for err := range proder.Errors(){
            log.Println(err)
            errors++
        }
    }()

go func() {
        t1 := time.NewTicker(time.Second)
        for{
            <- t1.C
            log.Println(enqueued)
        }
    }()

ProducerLoop:

for{
        message :=&sarama.ProducerMessage{Topic:"test",Value:sarama.StringEncoder("testing 123")}
        select {
        case proder.Input() <- message:
            enqueued++

case <- signals:
            proder.AsyncClose()
            break ProducerLoop
        }

}

wg.Wait()
    log.Println("Successfully produced:%d;errors:%d\n",successes,errors)

}

func consumer()  {
    coner,err := sarama.NewConsumer([]string{"localhost:9092"},nil)
    if err != nil {
        panic(err)
    }

defer func() {
        if err :=coner.Close(); err !=nil{
            log.Fatalln(err)
        }
    }()

partitionConsumer ,err := coner.ConsumePartition("test",0,sarama.OffsetNewest)
    if err != nil {
        panic(err)
    }

defer func() {
        if err := partitionConsumer.Close();err!=nil{
            log.Fatalln(err)
        }
    }()


    signals := make(chan os.Signal,1)
    signal.Notify(signals,os.Interrupt)
    consumed:=0

go func() {
        t1 := time.NewTicker(time.Second)
        for{
            <- t1.C
            log.Println(consumed)
        }
    }()

ConsumerLoop:
    for{
        select {
        case _ = <-partitionConsumer.Messages():

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/527ee30430c9d996bbdbba72bc2a3239.html