apollo_client.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. package agollo
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "net"
  8. "net/http"
  9. "net/url"
  10. )
  11. // https://github.com/ctripcorp/apollo/wiki/%E5%85%B6%E5%AE%83%E8%AF%AD%E8%A8%80%E5%AE%A2%E6%88%B7%E7%AB%AF%E6%8E%A5%E5%85%A5%E6%8C%87%E5%8D%97
  12. type ApolloClient interface {
  13. Notifications(configServerURL, appID, clusterName string, notifications []Notification) (int, []Notification, error)
  14. // 该接口会直接从数据库中获取配置,可以配合配置推送通知实现实时更新配置。
  15. GetConfigsFromNonCache(configServerURL, appID, cluster, namespace string, opts ...NotificationsOption) (int, *Config, error)
  16. // 该接口会从缓存中获取配置,适合频率较高的配置拉取请求,如简单的每30秒轮询一次配置。
  17. GetConfigsFromCache(configServerURL, appID, cluster, namespace string) (Configurations, error)
  18. }
  19. type Notifications []Notification
  20. func (n Notifications) String() string {
  21. bytes, _ := json.Marshal(n)
  22. return string(bytes)
  23. }
  24. type Notification struct {
  25. NamespaceName string `json:"namespaceName"` // namespaceName: "application",
  26. NotificationID int `json:"notificationId"` // notificationId: 107
  27. }
  28. type NotificationsOptions struct {
  29. ReleaseKey string
  30. }
  31. type NotificationsOption func(*NotificationsOptions)
  32. func ReleaseKey(releaseKey string) NotificationsOption {
  33. return func(o *NotificationsOptions) {
  34. o.ReleaseKey = releaseKey
  35. }
  36. }
  37. type Config struct {
  38. AppID string `json:"appId"` // appId: "AppTest",
  39. Cluster string `json:"cluster"` // cluster: "default",
  40. NamespaceName string `json:"namespaceName"` // namespaceName: "TEST.Namespace1",
  41. Configurations Configurations `json:"configurations"` // configurations: {Name: "Foo"},
  42. ReleaseKey string `json:"releaseKey"` // releaseKey: "20181017110222-5ce3b2da895720e8"
  43. }
  44. type Doer interface {
  45. Do(*http.Request) (*http.Response, error)
  46. }
  47. type apolloClient struct {
  48. Doer Doer
  49. IP string
  50. ConfigType string // 默认properties不需要在namespace后加后缀名,其他情况例如application.json {xml,yml,yaml,json,...}
  51. }
  52. type ApolloClientOption func(*apolloClient)
  53. func WithDoer(d Doer) ApolloClientOption {
  54. return func(a *apolloClient) {
  55. a.Doer = d
  56. }
  57. }
  58. func WithIP(ip string) ApolloClientOption {
  59. return func(a *apolloClient) {
  60. a.IP = ip
  61. }
  62. }
  63. func WithConfigType(configType string) ApolloClientOption {
  64. return func(a *apolloClient) {
  65. a.ConfigType = configType
  66. }
  67. }
  68. func NewApolloClient(opts ...ApolloClientOption) ApolloClient {
  69. c := &apolloClient{}
  70. for _, opt := range opts {
  71. opt(c)
  72. }
  73. if c.Doer == nil {
  74. c.Doer = &http.Client{
  75. Timeout: defaultClientTimeout, // Notifications由于服务端会hold住请求60秒,所以请确保客户端访问服务端的超时时间要大于60秒。
  76. }
  77. }
  78. if c.IP == "" {
  79. c.IP = localIP
  80. }
  81. if c.ConfigType == "" {
  82. c.ConfigType = defaultConfigType
  83. }
  84. return c
  85. }
  86. func (c *apolloClient) Notifications(configServerURL, appID, cluster string, notifications []Notification) (status int, result []Notification, err error) {
  87. configServerURL = normalizeURL(configServerURL)
  88. url := fmt.Sprintf("%s/notifications/v2?appId=%s&cluster=%s&notifications=%s",
  89. configServerURL,
  90. url.QueryEscape(appID),
  91. url.QueryEscape(cluster),
  92. url.QueryEscape(Notifications(notifications).String()),
  93. )
  94. var req *http.Request
  95. req, err = http.NewRequest("GET", url, nil)
  96. if err != nil {
  97. return
  98. }
  99. var body []byte
  100. status, body, err = parseResponseBody(c.Doer, req)
  101. if err != nil {
  102. return
  103. }
  104. if status == http.StatusOK {
  105. err = json.Unmarshal(body, &result)
  106. return
  107. }
  108. return
  109. }
  110. func (c *apolloClient) GetConfigsFromNonCache(configServerURL, appID, cluster, namespace string, opts ...NotificationsOption) (status int, config *Config, err error) {
  111. var options = NotificationsOptions{}
  112. for _, opt := range opts {
  113. opt(&options)
  114. }
  115. configServerURL = normalizeURL(configServerURL)
  116. url := fmt.Sprintf("%s/configs/%s/%s/%s?releaseKey=%s&ip=%s",
  117. configServerURL,
  118. url.QueryEscape(appID),
  119. url.QueryEscape(cluster),
  120. url.QueryEscape(c.getNamespace(namespace)),
  121. options.ReleaseKey,
  122. c.IP,
  123. )
  124. var req *http.Request
  125. req, err = http.NewRequest("GET", url, nil)
  126. if err != nil {
  127. return
  128. }
  129. var body []byte
  130. status, body, err = parseResponseBody(c.Doer, req)
  131. if err != nil {
  132. return
  133. }
  134. if status == http.StatusOK {
  135. config = new(Config)
  136. err = json.Unmarshal(body, config)
  137. return
  138. }
  139. return
  140. }
  141. func (c *apolloClient) GetConfigsFromCache(configServerURL, appID, cluster, namespace string) (config Configurations, err error) {
  142. configServerURL = normalizeURL(configServerURL)
  143. url := fmt.Sprintf("%s/configfiles/json/%s/%s/%s?ip=%s",
  144. configServerURL,
  145. url.QueryEscape(appID),
  146. url.QueryEscape(cluster),
  147. url.QueryEscape(c.getNamespace(namespace)),
  148. c.IP,
  149. )
  150. var req *http.Request
  151. req, err = http.NewRequest("GET", url, nil)
  152. if err != nil {
  153. return
  154. }
  155. var (
  156. body []byte
  157. status int
  158. )
  159. status, body, err = parseResponseBody(c.Doer, req)
  160. if err != nil {
  161. return
  162. }
  163. if status == http.StatusOK {
  164. config = make(Configurations)
  165. err = json.Unmarshal(body, &config)
  166. } else {
  167. err = errors.New(string(body))
  168. }
  169. return
  170. }
  171. // 配置文件有多种格式,例如:properties、xml、yml、yaml、json等。同样Namespace也具有这些格式。在Portal UI中可以看到“application”的Namespace上有一个“properties”标签,表明“application”是properties格式的。
  172. // 如果使用Http接口直接调用时,对应的namespace参数需要传入namespace的名字加上后缀名,如datasources.json。
  173. func (c *apolloClient) getNamespace(namespace string) string {
  174. if c.ConfigType == "" || c.ConfigType == defaultConfigType {
  175. return namespace
  176. }
  177. return namespace + "." + c.ConfigType
  178. }
  179. func getLocalIP() string {
  180. addrs, err := net.InterfaceAddrs()
  181. if err != nil {
  182. return ""
  183. }
  184. for _, address := range addrs {
  185. // check the address type and if it is not a loopback the display it
  186. if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
  187. if ipnet.IP.To4() != nil {
  188. return ipnet.IP.String()
  189. }
  190. }
  191. }
  192. return ""
  193. }
  194. func parseResponseBody(doer Doer, req *http.Request) (int, []byte, error) {
  195. resp, err := doer.Do(req)
  196. if err != nil {
  197. return 0, nil, err
  198. }
  199. defer resp.Body.Close()
  200. body, err := ioutil.ReadAll(resp.Body)
  201. if err != nil {
  202. return 0, nil, err
  203. }
  204. return resp.StatusCode, body, nil
  205. }