consul.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package consul
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "mop-account-system/common"
  6. "mop-account-system/config"
  7. "net"
  8. "net/http"
  9. "strconv"
  10. "time"
  11. consulapi "github.com/hashicorp/consul/api"
  12. "github.com/hashicorp/consul/api/watch"
  13. uuid "github.com/satori/go.uuid"
  14. )
  15. var (
  16. mopConsul *MopConsul
  17. )
  18. type MopConsul struct {
  19. CheckPort string
  20. NodeId string
  21. ConsulClient *consulapi.Client
  22. IsRegSucc bool
  23. }
  24. func InitConsul() {
  25. mopConsul = NewMopConsul()
  26. mopConsul.Init()
  27. }
  28. func NewMopConsul() *MopConsul {
  29. var m MopConsul
  30. m.CheckPort = "9091"
  31. m.NodeId = uuid.NewV4().String()
  32. m.IsRegSucc = false
  33. var err error
  34. cfg := consulapi.DefaultConfig()
  35. cfg.Address = config.GetConfig().ConsulAddr
  36. m.ConsulClient, err = consulapi.NewClient(cfg)
  37. if err != nil {
  38. panic(err)
  39. }
  40. return &m
  41. }
  42. func (m *MopConsul) Init() {
  43. m.InitCheck() //初始化check接口
  44. time.Sleep(1 * time.Second)
  45. m.RegisterServer() //注册服务
  46. go m.WatchService() //服务是否发生change
  47. go m.SyncServiceHealth(60) //主动拉取健康状态
  48. }
  49. func (m *MopConsul) InitCheck() {
  50. http.HandleFunc("/"+m.NodeId+"/check", func(w http.ResponseWriter, r *http.Request) {
  51. w.Write([]byte("success"))
  52. })
  53. go http.ListenAndServe(":"+m.CheckPort, nil)
  54. }
  55. func (m *MopConsul) GetLocalIP() string {
  56. addrs, err := net.InterfaceAddrs()
  57. if err != nil {
  58. return ""
  59. }
  60. for _, address := range addrs {
  61. if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
  62. if ipnet.IP.To4() != nil {
  63. return ipnet.IP.String()
  64. }
  65. }
  66. }
  67. return ""
  68. }
  69. func (m *MopConsul) RegisterServer() {
  70. registration := new(consulapi.AgentServiceRegistration)
  71. registration.ID = m.NodeId
  72. registration.Name = config.GetConfig().ServerName
  73. registration.Port, _ = strconv.Atoi(config.GetConfig().GrpcPort)
  74. registration.Tags = []string{config.GetConfig().ConsulTag}
  75. registration.Address = m.GetLocalIP()
  76. fmt.Println("localIP=" + registration.Address)
  77. registration.Check = &consulapi.AgentServiceCheck{
  78. CheckID: m.NodeId,
  79. HTTP: "http://" + registration.Address + ":" + m.CheckPort + "/" + m.NodeId + "/check",
  80. Timeout: "5s",
  81. Interval: "30s",
  82. DeregisterCriticalServiceAfter: "35s", //check失败后35秒删除本服务
  83. }
  84. err := m.ConsulClient.Agent().ServiceRegister(registration)
  85. if err != nil {
  86. fmt.Errorf("ServiceRegister err:" + err.Error())
  87. if m.IsRegSucc == false {
  88. panic(err)
  89. }
  90. }
  91. m.IsRegSucc = true
  92. fmt.Println("Consul RegisterServer success")
  93. }
  94. func (m *MopConsul) WatchService() {
  95. var notifyCh = make(chan struct{})
  96. watchContent := `{"type":"service", "service":"` + config.GetConfig().ServerName + `", "tag":"` + config.GetConfig().ConsulTag + `"}`
  97. plan := m.WatchParse(watchContent)
  98. plan.Handler = func(idx uint64, raw interface{}) {
  99. fmt.Println("service change...")
  100. if raw == nil {
  101. return // ignore
  102. }
  103. fmt.Println("do something...")
  104. notifyCh <- struct{}{}
  105. }
  106. go func() {
  107. if err := plan.Run(config.GetConfig().ConsulAddr); err != nil {
  108. panic(err)
  109. }
  110. }()
  111. defer plan.Stop()
  112. for {
  113. select {
  114. case <-notifyCh:
  115. m.RegisterAgain()
  116. }
  117. }
  118. }
  119. func (m *MopConsul) SyncServiceHealth(gap int) {
  120. for {
  121. time.Sleep(time.Duration(gap) * time.Second)
  122. isHealth := false
  123. services, _, err := m.ConsulClient.Health().Service(config.GetConfig().ServerName, config.GetConfig().ConsulTag, true, &consulapi.QueryOptions{WaitIndex: 0})
  124. if err != nil {
  125. fmt.Errorf("error retrieving instances from Consul: %s", err.Error())
  126. }
  127. for _, s := range services {
  128. if s.Service.ID == m.NodeId {
  129. isHealth = true
  130. break
  131. }
  132. }
  133. if !isHealth {
  134. m.RegisterServer()
  135. }
  136. }
  137. }
  138. func (m *MopConsul) WatchParse(q string) *watch.Plan {
  139. var params map[string]interface{}
  140. if err := json.Unmarshal([]byte(q), &params); err != nil {
  141. fmt.Errorf("Unmarshal err:" + err.Error())
  142. return nil
  143. }
  144. plan, err := watch.Parse(params)
  145. if err != nil {
  146. panic(err)
  147. }
  148. return plan
  149. }
  150. func (m *MopConsul) RegisterAgain() {
  151. if !m.CheckServiceIsOK() {
  152. m.RegisterServer()
  153. }
  154. }
  155. func (m *MopConsul) CheckServiceIsOK() bool {
  156. services, _ := m.ConsulClient.Agent().Services()
  157. isOK := false
  158. if len(services) > 0 {
  159. if value, ok := services[m.NodeId]; ok {
  160. fmt.Println("service=" + common.InterfaceToJsonString(value))
  161. if value.Service == config.GetConfig().ServerName {
  162. if value.Weights.Passing == 1 {
  163. isOK = true
  164. }
  165. }
  166. }
  167. }
  168. return isOK
  169. }