服务注册与发现:Consul
原创大约 5 分钟
集成&开发
环境和脚本
> go get -u github.com/hashicorp/consul/api
> go get -u github.com/mbobakov/grpc-consul-resolver
> docker run -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8500:8500 -p 8600:8600 -d consul:1.15.4 consul agent -dev -client=0.0.0.0
# 或者本地环境运行:
> ./consul agent -dev -client=0.0.0.0
# 要测试负载均衡,可通过将同一个服务启动两次的方法来实现
# 1、先直接在goland中用源代码启动
# 2、然后在goland的Terminal启动(执行以下命令)
> cd ~/workspace-go/gomicroservice
> go run userapi/main.go
Gin服务接口
1. 配置结构体
......
type ConsulConfig struct {
Host string `mapstructure:"host" json:"host"`
Port int `mapstructure:"port" json:"port"`
}
type ServerConfig struct {
Env string `mapstructure:"env" json:"env"`
Name string `mapstructure:"name" json:"name"`
Host string `mapstructure:"host" json:"host"`
Port int `mapstructure:"port" json:"port"`
UserSrvInfo UserSrv `mapstructure:"usersrv" json:"usersrv"`
JWTInfo JWTConfig `mapstructure:"jwt" json:"jwt"`
AliSmsInfo AliSmsConfig `mapstructure:"sms" json:"sms"`
RedisInfo RedisConfig `mapstructure:"redis" json:"redis"`
ConsulInfo ConsulConfig `mapstructure:"consul" json:"consul"`
LoggerInfo LoggerConfig `mapstructure:"logger" json:"logger"`
}
......
2. 初始化和grpc服务的连接
package initialize
import (
"fmt"
"github.com/hashicorp/consul/api"
_ "github.com/mbobakov/grpc-consul-resolver"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"gomicroservice/userapi/global"
"gomicroservice/userapi/proto"
"gomicroservice/userapi/utils/otgrpc"
"google.golang.org/grpc"
)
// InitialServerConnection 初始化服务连接
func InitialServerConnection() {
// gin从consul中获取到用户grpc服务
cfg := api.DefaultConfig()
consulInfo := global.ServerConfig.ConsulInfo
cfg.Address = fmt.Sprintf("%s:%d", consulInfo.Host, consulInfo.Port)
userSrvHost := ""
userSrvPort := 0
consulClient, _ := api.NewClient(cfg)
services, _ := consulClient.Agent().ServicesWithFilter(fmt.Sprintf(`Service == "%s"`, global.ServerConfig.UserSrvInfo.Name))
for _, value := range services {
userSrvHost = value.Address
userSrvPort = value.Port
}
if userSrvHost != "" && 0 != userSrvPort {
global.ServerConfig.UserSrvInfo.Host = userSrvHost
global.ServerConfig.UserSrvInfo.Port = userSrvPort
}
// 连接用户grpc服务,并实现负载均衡
dialString := fmt.Sprintf("consul://%s:%d/%s?wait=10s", global.ServerConfig.ConsulInfo.Host, global.ServerConfig.ConsulInfo.Port, global.ServerConfig.UserSrvInfo.Name)
conn, err := grpc.Dial(dialString,
grpc.WithInsecure(),
// 负载均衡
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
// 在轮询时加入链路追踪,但没有父Span,需要修改OpenTracingClientInterceptor源码
grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())),
)
if err != nil {
zap.S().Panicf("初始化服务连接失败:%s", err.Error())
}
// 调用接口
global.UserSrvClient = proto.NewUserServiceClient(conn)
}
3. 注册到Consul
package middleware
import (
"fmt"
"github.com/hashicorp/consul/api"
"go.uber.org/zap"
)
type Registry struct {
Host string
Port int
}
type RegistryClient interface {
Register(id, name, schema, address, path string, port int) error
DeRegister(serviceId string) error
}
// NewRegistryClient 返回一个注册客户端接口
func NewRegistryClient(host string, port int) RegistryClient {
return &Registry{
Host: host,
Port: port,
}
}
// Register 注册Consul服务
func (registry *Registry) Register(id, name, schema, address, path string, port int) error {
cfg := api.DefaultConfig()
// consul服务地址:172.16.185.168:8500
cfg.Address = fmt.Sprintf("%s:%d", registry.Host, registry.Port)
client, err := api.NewClient(cfg)
if err != nil {
panic(err)
}
check := &api.AgentServiceCheck{
// HTTP模式的健康检查需要完整的路径,例如:http://192.168.2.31:8080/v1/system/health
HTTP: fmt.Sprintf("%s%s:%d%s", schema, address, port, path),
Timeout: "5s",
Interval: "5s",
DeregisterCriticalServiceAfter: "10s",
}
registration := &api.AgentServiceRegistration{
ID: id,
Name: name,
Address: address, // 这里的address要和check中的服务IP地址一致
Port: port,
Tags: []string{name},
Check: check,
}
return client.Agent().ServiceRegister(registration)
}
// DeRegister 注销Consul服务
func (registry *Registry) DeRegister(id string) error {
cfg := api.DefaultConfig()
cfg.Address = fmt.Sprintf("%s:%d", registry.Host, registry.Port)
client, err := api.NewClient(cfg)
if err != nil {
return err
}
return client.Agent().ServiceDeregister(id)
}
4. 启动代码
package main
func main() {
......
// 8. 把自己注册到Consul,方便后续API网关kong的调用
// 随机生成服务ID
serviceId := utils.UUID()
registerClient := middleware.NewRegistryClient(global.ServerConfig.ConsulInfo.Host, global.ServerConfig.ConsulInfo.Port)
err := registerClient.Register(serviceId, global.ServerConfig.Name, global.ServerConfig.ConsulInfo.Schema, global.ServerConfig.Host, global.ServerConfig.ConsulInfo.Path, global.ServerConfig.Port)
if err != nil {
zap.S().Panicf("Consul服务 (%s - %s) 注册失败:%s", serviceId, global.ServerConfig.Name, err.Error())
}
zap.S().Infof("Consul服务 (%s - %s) 注册成功...", serviceId, global.ServerConfig.Name)
go func() {
if err := roots.Run(fmt.Sprintf(":%d", global.ServerConfig.Port)); err != nil {
zap.S().Panicf("启动用户gin接口失败:%s", err.Error())
return
}
}()
// 9. 优雅退出
quitSignal := make(chan os.Signal)
signal.Notify(quitSignal, syscall.SIGINT, syscall.SIGTERM)
<-quitSignal
// 注销Consul服务
defer func(id string) {
err := registerClient.DeRegister(id)
if err != nil {
zap.S().Errorf("Consul服务 (%s - %s) 注销失败:%s", id, global.ServerConfig.Name, err.Error())
} else {
zap.S().Infof("Consul服务 (%s - %s) 已注销...", id, global.ServerConfig.Name)
}
}(serviceId)
}
GRPC服务实现
1. 配置结构体
......
type ConsulConfig struct {
Host string `mapstructure:"host" json:"host"`
Port int `mapstructure:"port" json:"port"`
}
type ServerConfig struct {
Env string `mapstructure:"env" json:"env"`
Name string `mapstructure:"name" json:"name"`
Host string `mapstructure:"host" json:"host"`
Port int `mapstructure:"port" json:"port"`
ConsulInfo ConsulConfig `mapstructure:"consul" json:"consul"`
}
......
2. 中间件
package middleware
import (
"fmt"
"github.com/hashicorp/consul/api"
"gomicroservice/usersrv/global"
)
type Registry struct {
Host string
Port int
}
type RegistryClient interface {
Register(id, name, address string, port int) error
DeRegister(serviceId string) error
}
// NewRegistryClient 返回一个注册客户端接口
func NewRegistryClient(host string, port int) RegistryClient {
return &Registry{
Host: host,
Port: port,
}
}
// Register 注册Consul服务
func (registry *Registry) Register(id, name, address string, port int) error {
cfg := api.DefaultConfig()
// consul服务地址:172.16.185.168:8500
cfg.Address = fmt.Sprintf("%s:%d", registry.Host, registry.Port)
client, err := api.NewClient(cfg)
if err != nil {
panic(err)
}
check := &api.AgentServiceCheck{
// grpc服务地址,例如:192.168.2.31:9090
GRPC: fmt.Sprintf("%s:%d", global.ServerConfig.Host, port),
Timeout: "5s",
Interval: "5s",
DeregisterCriticalServiceAfter: "10s",
}
registration := &api.AgentServiceRegistration{
ID: id,
Name: name,
Address: address, // 这里的address要和check中的服务IP地址一致
Port: port,
Tags: []string{name},
Check: check,
}
return client.Agent().ServiceRegister(registration)
}
// DeRegister 注销Consul服务
func (registry *Registry) DeRegister(id string) error {
cfg := api.DefaultConfig()
cfg.Address = fmt.Sprintf("%s:%d", registry.Host, registry.Port)
client, err := api.NewClient(cfg)
if err != nil {
return err
}
return client.Agent().ServiceDeregister(id)
}
3. 启动代码
package main
func main() {
......
// 5. grpc服务健康检查
grpc_health_v1.RegisterHealthServer(server, health.NewServer())
// 6. Consul服务注册,将自动生成的端口号传进去
// 随机生成服务ID
serviceId := utils.UUID()
registerClient := middleware.NewRegistryClient(global.ServerConfig.ConsulInfo.Host, global.ServerConfig.ConsulInfo.Port)
err = registerClient.Register(serviceId, global.ServerConfig.Name, global.ServerConfig.Host, global.ServerConfig.Port)
if err != nil {
zap.S().Panicf("Consul服务 (%s - %s) 注册失败:%s", serviceId, global.ServerConfig.Name, err.Error())
}
zap.S().Infof("Consul服务 (%s - %s) 注册成功...", serviceId, global.ServerConfig.Name)
// 7. 启动服务
go func() {
err = server.Serve(list)
if err != nil {
zap.S().Panicf("启动用户grpc服务失败:" + err.Error())
}
}()
// 8. 优雅退出
quitSignal := make(chan os.Signal)
signal.Notify(quitSignal, syscall.SIGINT, syscall.SIGTERM)
<-quitSignal
// 服务注销时也关闭jaeger
defer func(closer io.Closer) {
err := closer.Close()
if err != nil {
zap.S().Errorf("注销时关闭Jaeger失败:" + err.Error())
}
}(closer)
// 注销Consul服务
defer func(id string) {
err := registerClient.DeRegister(id)
if err != nil {
zap.S().Errorf("Consul服务 (%s - %s) 注销失败:%s", id, global.ServerConfig.Name, err.Error())
} else {
zap.S().Infof("Consul服务 (%s - %s) 已注销...", id, global.ServerConfig.Name)
}
}(serviceId)
}
感谢支持
更多内容,请移步《超级个体》。