Go微服务基础组件(RocketMQ)
原创大约 2 分钟
环境和脚本
> go get -u github.com/apache/rocketmq-client-go/v2
Go发送普通消息
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"172.16.185.168:9876"}),
producer.WithRetry(2),
)
if err := p.Start(); err != nil {
panic(fmt.Sprintf("启动producer失败:%s", err.Error()))
}
msg := &primitive.Message{
Topic: "NewGo",
Body: []byte("Hello RocketMQ Go Client"),
}
if res, err := p.SendSync(context.Background(), msg); err != nil {
fmt.Printf("发送消息失败:%s\n", err)
} else {
fmt.Printf("发送消息成功:result=%s\n", res.String())
}
if err := p.Shutdown(); err != nil {
fmt.Printf("关闭producer失败:%s", err.Error())
}
}
Go发送事务消息
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
type DemoListener struct {
}
func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
//// 1. 模拟本业务执行:执行成功
//fmt.Println("开始执行业务逻辑...")
//time.Sleep(3 * time.Second)
//fmt.Println("业务逻辑执行成功!")
//return primitive.CommitMessageState
//// 2. 模拟本业务执行:执行失败
//fmt.Println("开始执行业务逻辑...")
//time.Sleep(3 * time.Second)
//fmt.Println("业务逻辑执行失败!")
//return primitive.RollbackMessageState
// 3. 执行异常:宕机、崩溃
fmt.Println("开始执行业务逻辑...")
time.Sleep(3 * time.Second)
fmt.Println("业务逻辑执行异常!")
return primitive.UnknowState
}
func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
// 对应执行的第三种情况
fmt.Println("RocketMQ消息回查")
time.Sleep(10 * time.Second)
fmt.Println("回查结果:CommitMessageState")
return primitive.CommitMessageState
}
func main() {
p, _ := rocketmq.NewTransactionProducer(
&DemoListener{},
producer.WithNameServer([]string{"172.16.185.168:9876"}),
producer.WithRetry(1),
)
if err := p.Start(); err != nil {
fmt.Printf("启动producer错误:%s\n", err.Error())
os.Exit(1)
}
res, err := p.SendMessageInTransaction(context.Background(), primitive.NewMessage("NewGo", []byte("这是一条事务消息")))
if err != nil {
fmt.Printf("发送消息错误:%s\n", err)
} else {
fmt.Printf("发送消息成功:result=%s\n", res.String())
}
time.Sleep(30 * time.Minute)
if err := p.Shutdown(); err != nil {
fmt.Printf("关闭producer错误: %s", err.Error())
}
}
Go消费普通消息
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
)
func main() {
sig := make(chan os.Signal)
c, _ := rocketmq.NewPushConsumer(
consumer.WithNameServer([]string{"172.16.185.168:9876"}),
consumer.WithGroupName("testGroup"),
)
if err := c.Subscribe("NewGo", consumer.MessageSelector{}, func(ctx context.Context,
res ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range res {
fmt.Printf("获取到值:%v\n", res[i])
}
return consumer.ConsumeSuccess, nil
}); err != nil {
fmt.Println(err.Error())
}
if err := c.Start(); err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
<-sig
if err := c.Shutdown(); err != nil {
fmt.Printf("关闭消费者失败:%s", err.Error())
}
}
感谢支持
更多内容,请移步《超级个体》。