consumed++
// log.Println( string(msg.Value)," => ",consumed)
case <-signals:
break ConsumerLoop
}
}
log.Printf("Consumed: %d\n", consumed)
}
rabbitmq:
package main
import (
"github.com/streadway/amqp"
"time"
"fmt"
"log"
)
const (
queueName = "push.msg.q"
exchange = "t.msg.ex"
mqurl ="amqp://shimeng:shimeng1015@192.168.155.106:5672/push"
)
var conn *amqp.Connection
var channel *amqp.Channel
func main() {
fmt.Println(1)
// push()
receive()
// fmt.Println("end")
// close()
}
func failOnErr(err error, msg string) {
if err != nil {
log.Fatalf("%s:%s", msg, err)
panic(fmt.Sprintf("%s:%s", msg, err))
}
}
func mqConnect() {
var err error
conn, err = amqp.Dial(mqurl)
if err != nil {
log.Println(1)
log.Fatalln(err)
}
fmt.Println(5)
channel, err = conn.Channel()
if err != nil {
fmt.Println(2)
log.Fatalln(err)
}else {
fmt.Println("a")
}
}
func push() {
count := 0
if channel == nil {
fmt.Println(2)
mqConnect()
}else {
fmt.Println(3)
}
msgContent := "hello world!"
t1 := time.NewTicker(time.Second)
go func() {
for{
<- t1.C
log.Println(count)
}
}()
for{
err := channel.Publish(exchange, "test", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msgContent),
})
if err != nil {
}else {
count ++
}
}
}
func receive() {
if channel == nil {
mqConnect()
}
count :=0
msgs, err := channel.Consume(queueName, "", true, false, false, false, nil)
failOnErr(err, "")
forever := make(chan bool)
t1 := time.NewTicker(time.Second)
go func() {
for{
<- t1.C
log.Println(count)
}
}()
go func() {
//fmt.Println(*msgs)
for _= range msgs {
count ++
// s := BytesToString(&(d.Body))
// count++
// fmt.Printf("receve msg is :%s -- %d\n", *s, count)
}
}()
fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
<-forever
}
CentOS 5.6 安装RabbitMQ
用Python尝试RabbitMQ