| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- package agollo
- import (
- "encoding/json"
- "errors"
- "fmt"
- "io/ioutil"
- "net"
- "net/http"
- "net/url"
- )
- // https://github.com/ctripcorp/apollo/wiki/%E5%85%B6%E5%AE%83%E8%AF%AD%E8%A8%80%E5%AE%A2%E6%88%B7%E7%AB%AF%E6%8E%A5%E5%85%A5%E6%8C%87%E5%8D%97
- type ApolloClient interface {
- Notifications(configServerURL, appID, clusterName string, notifications []Notification) (int, []Notification, error)
- // 该接口会直接从数据库中获取配置,可以配合配置推送通知实现实时更新配置。
- GetConfigsFromNonCache(configServerURL, appID, cluster, namespace string, opts ...NotificationsOption) (int, *Config, error)
- // 该接口会从缓存中获取配置,适合频率较高的配置拉取请求,如简单的每30秒轮询一次配置。
- GetConfigsFromCache(configServerURL, appID, cluster, namespace string) (Configurations, error)
- }
- type Notifications []Notification
- func (n Notifications) String() string {
- bytes, _ := json.Marshal(n)
- return string(bytes)
- }
- type Notification struct {
- NamespaceName string `json:"namespaceName"` // namespaceName: "application",
- NotificationID int `json:"notificationId"` // notificationId: 107
- }
- type NotificationsOptions struct {
- ReleaseKey string
- }
- type NotificationsOption func(*NotificationsOptions)
- func ReleaseKey(releaseKey string) NotificationsOption {
- return func(o *NotificationsOptions) {
- o.ReleaseKey = releaseKey
- }
- }
- type Config struct {
- AppID string `json:"appId"` // appId: "AppTest",
- Cluster string `json:"cluster"` // cluster: "default",
- NamespaceName string `json:"namespaceName"` // namespaceName: "TEST.Namespace1",
- Configurations Configurations `json:"configurations"` // configurations: {Name: "Foo"},
- ReleaseKey string `json:"releaseKey"` // releaseKey: "20181017110222-5ce3b2da895720e8"
- }
- type Doer interface {
- Do(*http.Request) (*http.Response, error)
- }
- type apolloClient struct {
- Doer Doer
- IP string
- ConfigType string // 默认properties不需要在namespace后加后缀名,其他情况例如application.json {xml,yml,yaml,json,...}
- }
- type ApolloClientOption func(*apolloClient)
- func WithDoer(d Doer) ApolloClientOption {
- return func(a *apolloClient) {
- a.Doer = d
- }
- }
- func WithIP(ip string) ApolloClientOption {
- return func(a *apolloClient) {
- a.IP = ip
- }
- }
- func WithConfigType(configType string) ApolloClientOption {
- return func(a *apolloClient) {
- a.ConfigType = configType
- }
- }
- func NewApolloClient(opts ...ApolloClientOption) ApolloClient {
- c := &apolloClient{}
- for _, opt := range opts {
- opt(c)
- }
- if c.Doer == nil {
- c.Doer = &http.Client{
- Timeout: defaultClientTimeout, // Notifications由于服务端会hold住请求60秒,所以请确保客户端访问服务端的超时时间要大于60秒。
- }
- }
- if c.IP == "" {
- c.IP = localIP
- }
- if c.ConfigType == "" {
- c.ConfigType = defaultConfigType
- }
- return c
- }
- func (c *apolloClient) Notifications(configServerURL, appID, cluster string, notifications []Notification) (status int, result []Notification, err error) {
- configServerURL = normalizeURL(configServerURL)
- url := fmt.Sprintf("%s/notifications/v2?appId=%s&cluster=%s¬ifications=%s",
- configServerURL,
- url.QueryEscape(appID),
- url.QueryEscape(cluster),
- url.QueryEscape(Notifications(notifications).String()),
- )
- var req *http.Request
- req, err = http.NewRequest("GET", url, nil)
- if err != nil {
- return
- }
- var body []byte
- status, body, err = parseResponseBody(c.Doer, req)
- if err != nil {
- return
- }
- if status == http.StatusOK {
- err = json.Unmarshal(body, &result)
- return
- }
- return
- }
- func (c *apolloClient) GetConfigsFromNonCache(configServerURL, appID, cluster, namespace string, opts ...NotificationsOption) (status int, config *Config, err error) {
- var options = NotificationsOptions{}
- for _, opt := range opts {
- opt(&options)
- }
- configServerURL = normalizeURL(configServerURL)
- url := fmt.Sprintf("%s/configs/%s/%s/%s?releaseKey=%s&ip=%s",
- configServerURL,
- url.QueryEscape(appID),
- url.QueryEscape(cluster),
- url.QueryEscape(c.getNamespace(namespace)),
- options.ReleaseKey,
- c.IP,
- )
- var req *http.Request
- req, err = http.NewRequest("GET", url, nil)
- if err != nil {
- return
- }
- var body []byte
- status, body, err = parseResponseBody(c.Doer, req)
- if err != nil {
- return
- }
- if status == http.StatusOK {
- config = new(Config)
- err = json.Unmarshal(body, config)
- return
- }
- return
- }
- func (c *apolloClient) GetConfigsFromCache(configServerURL, appID, cluster, namespace string) (config Configurations, err error) {
- configServerURL = normalizeURL(configServerURL)
- url := fmt.Sprintf("%s/configfiles/json/%s/%s/%s?ip=%s",
- configServerURL,
- url.QueryEscape(appID),
- url.QueryEscape(cluster),
- url.QueryEscape(c.getNamespace(namespace)),
- c.IP,
- )
- var req *http.Request
- req, err = http.NewRequest("GET", url, nil)
- if err != nil {
- return
- }
- var (
- body []byte
- status int
- )
- status, body, err = parseResponseBody(c.Doer, req)
- if err != nil {
- return
- }
- if status == http.StatusOK {
- config = make(Configurations)
- err = json.Unmarshal(body, &config)
- } else {
- err = errors.New(string(body))
- }
- return
- }
- // 配置文件有多种格式,例如:properties、xml、yml、yaml、json等。同样Namespace也具有这些格式。在Portal UI中可以看到“application”的Namespace上有一个“properties”标签,表明“application”是properties格式的。
- // 如果使用Http接口直接调用时,对应的namespace参数需要传入namespace的名字加上后缀名,如datasources.json。
- func (c *apolloClient) getNamespace(namespace string) string {
- if c.ConfigType == "" || c.ConfigType == defaultConfigType {
- return namespace
- }
- return namespace + "." + c.ConfigType
- }
- func getLocalIP() string {
- addrs, err := net.InterfaceAddrs()
- if err != nil {
- return ""
- }
- for _, address := range addrs {
- // check the address type and if it is not a loopback the display it
- if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
- if ipnet.IP.To4() != nil {
- return ipnet.IP.String()
- }
- }
- }
- return ""
- }
- func parseResponseBody(doer Doer, req *http.Request) (int, []byte, error) {
- resp, err := doer.Do(req)
- if err != nil {
- return 0, nil, err
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return 0, nil, err
- }
- return resp.StatusCode, body, nil
- }
|