package agollo import ( "encoding/json" "io/ioutil" "net/http" "os" "sync" "time" ) var ( localIP = getLocalIP() defaultConfigFilePath = "app.properties" defaultCluster = "default" defaultNamespace = "application" defaultConfigType = "properties" defaultBackupFile = ".agollo" defaultClientTimeout = 90 * time.Second defaultNotificationID = -1 defaultLongPollInterval = 1 * time.Second defaultAutoFetchOnCacheMiss = false defaultFailTolerantOnBackupExists = false defaultWatchTimeout = 500 * time.Millisecond watchLock sync.RWMutex timeZone = time.FixedZone("CST", 8*3600) defaultAgollo Agollo ) type Agollo interface { Start() <-chan *LongPollerError Stop() Reload() error Get(key string, opts ...GetOption) string GetNameSpace(namespace string) Configurations GetAll() map[string]interface{} Watch() <-chan *ApolloResponse WatchNamespace(namespace string, stop chan bool) <-chan *ApolloResponse Options() Options } type Configurations map[string]interface{} type ApolloResponse struct { Namespace string OldValue Configurations NewValue Configurations Error error } type LongPollerError struct { ConfigServerURL string AppID string Cluster string Notifications []Notification Namespace string // 服务响应200后去非缓存接口拉取时的namespace Err error } type agollo struct { opts Options notificationMapLock sync.Mutex notificationMap map[string]int // key: namespace value: notificationId namespaceMapLock sync.Mutex namespaceMap map[string]string // key: namespace value: releaseKey cacheLock sync.Mutex cache map[string]interface{} // key: namespace value: Configurations watchCh chan *ApolloResponse // watch all namespace watchNamespaceChMap map[string]chan *ApolloResponse // key: namespace value: chan *ApolloResponse errorsCh chan *LongPollerError runOnce sync.Once stop bool stopCh chan struct{} stopLock sync.Mutex } func New(configServerURL, appID string, opts ...Option) (Agollo, error) { a := &agollo{ stopCh: make(chan struct{}), errorsCh: make(chan *LongPollerError), opts: newOptions(opts...), } a.opts.ConfigServerURL = normalizeURL(configServerURL) a.opts.AppID = appID return a.preload() } func NewWithConfigFile(configFilePath string, opts ...Option) (Agollo, error) { f, err := os.Open(configFilePath) if err != nil { return nil, err } defer f.Close() var conf struct { AppID string `json:"appId,omitempty"` Cluster string `json:"cluster,omitempty"` IP string `json:"ip,omitempty"` } if err := json.NewDecoder(f).Decode(&conf); err != nil { return nil, err } return New( conf.IP, conf.AppID, append( []Option{ Cluster(conf.Cluster), }, opts..., )..., ) } func (a *agollo) preload() (Agollo, error) { for _, namespace := range a.opts.PreloadNamespaces { _, err := a.loadConfigFromNonCache(namespace) if err != nil { if a.opts.FailTolerantOnBackupExists { _, err = a.loadBackup(namespace) if err != nil { return nil, err } continue } return nil, err } } return a, nil } func (a *agollo) Reload() error { err := a.loadAllConfigFromNonCacheAndThenCache() if err == nil { return nil } //a.log("ReloadAllNamespace", "From api", "Error", err.Error()) err = a.loadAllFromBackup() if err != nil { return err } return nil } func (a *agollo) GetAll() map[string]interface{} { return a.cache } func (a *agollo) Get(key string, opts ...GetOption) string { getOpts := newGetOptions( append( []GetOption{ WithNamespace(a.opts.DefaultNamespace), }, opts..., )..., ) val, found := a.GetNameSpace(getOpts.Namespace)[key] if !found { return getOpts.DefaultValue } v, _ := ToStringE(val) return v } func (a *agollo) GetNameSpace(namespace string) Configurations { if len(a.cache) == 0 { a.cache = make(map[string]interface{}) } //LoadOrStore 函数的意义就是当前没有关系的时候 绑定关系 有的话返回关系 configs, val := a.cache[namespace] if val == false { a.cache[namespace] = Configurations{} configs = Configurations{} return a.loadNameSpace(namespace) } else { configs = a.cache[namespace] } //a.log("Namesapce", namespace, "From", "cache") return configs.(Configurations) } func (a *agollo) loadNameSpace(namespace string) Configurations { // 存储不存在的namespace, 之后在longPoller中拉取配置 a.notificationMapLock.Lock() defer a.notificationMapLock.Unlock() if len(a.notificationMap) == 0 { a.notificationMap = make(map[string]int) } _, val := a.notificationMap[namespace] if val == false { a.notificationMap[namespace] = defaultNotificationID } if a.opts.AutoFetchOnCacheMiss { configs, err := a.loadConfigFromCache(namespace) if err == nil { //a.log("Namesapce", namespace, "From", "cache-api") return configs } } if a.opts.FailTolerantOnBackupExists { configs, err := a.loadBackup(namespace) if err == nil { a.log("Namesapce", namespace, "From", "local") return configs } } return Configurations{} } func (a *agollo) Options() Options { return a.opts } func (a *agollo) Watch() <-chan *ApolloResponse { watchLock.Lock() defer watchLock.Unlock() if a.watchCh == nil { a.watchCh = make(chan *ApolloResponse) } return a.watchCh } func (a *agollo) WatchNamespace(namespace string, stop chan bool) <-chan *ApolloResponse { watchCh, val := a.watchNamespaceChMap[namespace] if val == false { go func(stop chan bool) { select { case <-stop: delete(a.watchNamespaceChMap, namespace) //a.watchNamespaceChMap.Delete(namespace) } }(stop) watchCh = make(chan *ApolloResponse) } return watchCh } func (a *agollo) sendWatchCh(namespace string, oldVal, newVal Configurations) { resp := &ApolloResponse{ Namespace: namespace, OldValue: oldVal, NewValue: newVal, } timer := time.NewTimer(defaultWatchTimeout) for _, watchCh := range a.getWatchChs(namespace) { select { case watchCh <- resp: case <-timer.C: // 防止创建全局监听或者某个namespace监听却不消费死锁问题 timer.Reset(defaultWatchTimeout) } } } func (a *agollo) getWatchChs(namespace string) []chan *ApolloResponse { var chs []chan *ApolloResponse watchLock.Lock() defer watchLock.Unlock() if a.watchCh != nil { chs = append(chs, a.watchCh) } if watchNamespaceCh, found := a.watchNamespaceChMap[namespace]; found { chs = append(chs, watchNamespaceCh) } return chs } func (a *agollo) sendErrorsCh(notifications []Notification, namespace string, err error) { longPollerError := &LongPollerError{ ConfigServerURL: a.opts.ConfigServerURL, AppID: a.opts.AppID, Cluster: a.opts.Cluster, Notifications: notifications, Namespace: namespace, Err: err, } select { case a.errorsCh <- longPollerError: default: } } func (a *agollo) log(kvs ...interface{}) { timeStr := time.Now().In(timeZone).Format("2006-01-02 15:04:05") a.opts.Logger.Log( append([]interface{}{ "[" + timeStr + "]", "[Agollo]", "ConfigServerUrl", a.opts.ConfigServerURL, "AppID", a.opts.AppID, "Cluster", a.opts.Cluster, }, kvs..., )..., ) } func (a *agollo) loadConfigFromCache(namespace string) (configurations Configurations, err error) { configurations, err = a.opts.ApolloClient.GetConfigsFromCache( a.opts.ConfigServerURL, a.opts.AppID, a.opts.Cluster, namespace) if err != nil { a.log("Namespace", namespace, "Action", "LoadConfigFromCache", "Error", err.Error()) return } err = a.handleConfig(namespace, configurations) return } func (a *agollo) loadConfigFromNonCache(namespace string) (configurations Configurations, err error) { var ( status int config *Config //cachedReleaseKey, _ = a.namespaceMap.LoadOrStore(namespace, "") ) a.namespaceMapLock.Lock() defer a.namespaceMapLock.Unlock() if len(a.namespaceMap) == 0 { a.namespaceMap = make(map[string]string) } cachedReleaseKey, val := a.namespaceMap[namespace] if val == false { a.namespaceMap[namespace] = "" cachedReleaseKey = "" } else { cachedReleaseKey = a.namespaceMap[namespace] } status, config, err = a.opts.ApolloClient.GetConfigsFromNonCache( a.opts.ConfigServerURL, a.opts.AppID, a.opts.Cluster, namespace, ReleaseKey(cachedReleaseKey), ) if err != nil { a.log("Namespace", namespace, "Action", "LoadConfigFromNonCache", "Error", err.Error()) return } if status == http.StatusOK { configurations = config.Configurations a.namespaceMap[namespace] = config.ReleaseKey err = a.handleConfig(namespace, config.Configurations) return } return } func (a *agollo) loadAllConfigFromNonCacheAndThenCache() (err error) { var ( status int config *Config //cachedReleaseKey, _ = a.namespaceMap.LoadOrStore(namespace, "") ) a.namespaceMapLock.Lock() defer a.namespaceMapLock.Unlock() if len(a.cache) == 0 { a.cache = make(map[string]interface{}) } for _, namespace := range a.opts.PreloadNamespaces { if len(a.namespaceMap) == 0 { a.namespaceMap = make(map[string]string) } cachedReleaseKey, val := a.namespaceMap[namespace] if val == false { a.namespaceMap[namespace] = "" cachedReleaseKey = "" } else { cachedReleaseKey = a.namespaceMap[namespace] } status, config, err = a.opts.ApolloClient.GetConfigsFromNonCache( a.opts.ConfigServerURL, a.opts.AppID, a.opts.Cluster, namespace, ReleaseKey(cachedReleaseKey), ) if err != nil { a.log("Namespace", namespace, "Action", "LoadConfigFromNonCache", "Error", err.Error()) return err } if status == http.StatusOK { configurations := config.Configurations a.namespaceMap[namespace] = config.ReleaseKey a.cacheLock.Lock() a.cache[namespace] = configurations a.cacheLock.Unlock() } } //a.log("ReloadAllNamespace", "From api") return a.backup() } func (a *agollo) handleConfig(namespace string, configurations Configurations) error { // 读取旧缓存用来给监听队列 oldValue := a.GetNameSpace(namespace) // 覆盖旧缓存 a.cacheLock.Lock() a.cache[namespace] = configurations a.cacheLock.Unlock() // 发送到监听channel a.sendWatchCh(namespace, oldValue, configurations) // 备份配置 return a.backup() } func (a *agollo) backup() error { backup := map[string]Configurations{} a.cacheLock.Lock() defer a.cacheLock.Unlock() for key, val := range a.cache { conf, _ := val.(Configurations) backup[key] = conf } data, err := json.Marshal(backup) if err != nil { return err } return ioutil.WriteFile(a.opts.BackupFile, data, 0644) } func (a *agollo) loadAllFromBackup() error { if len(a.cache) == 0 { a.cache = make(map[string]interface{}) } if _, err := os.Stat(a.opts.BackupFile); err != nil { return err } data, err := ioutil.ReadFile(a.opts.BackupFile) if err != nil { return err } backup := map[string]Configurations{} err = json.Unmarshal(data, &backup) if err != nil { return err } a.cacheLock.Lock() defer a.cacheLock.Unlock() for _, specifyNamespace := range a.opts.PreloadNamespaces { for namespace, configs := range backup { if namespace == specifyNamespace { a.cache[namespace] = configs } } } a.log("ReloadAllNamesapce", "From", "Backup") return nil } func (a *agollo) loadBackup(specifyNamespace string) (Configurations, error) { if _, err := os.Stat(a.opts.BackupFile); err != nil { return nil, nil } data, err := ioutil.ReadFile(a.opts.BackupFile) if err != nil { return nil, err } backup := map[string]Configurations{} err = json.Unmarshal(data, &backup) if err != nil { return nil, err } for namespace, configs := range backup { if namespace == specifyNamespace { if len(a.cache) == 0 { a.cache = make(map[string]interface{}) } a.cache[namespace] = configs return configs, nil } } return nil, nil } func (a *agollo) longPoll() { notifications := a.notifications() status, notifications, err := a.opts.ApolloClient.Notifications( a.opts.ConfigServerURL, a.opts.AppID, a.opts.Cluster, notifications, ) if err != nil { a.log("Notifications", Notifications(a.notifications()).String(), "Error", err.Error(), "Action", "LongPoll") a.sendErrorsCh(notifications, "", err) } if status == http.StatusOK { // 服务端判断没有改变,不会返回结果,这个时候不需要修改,遍历空数组跳过 for _, notification := range notifications { _, err = a.loadConfigFromNonCache(notification.NamespaceName) if err == nil { a.notificationMapLock.Lock() a.notificationMap[notification.NamespaceName] = notification.NotificationID a.notificationMapLock.Unlock() continue } else { a.sendErrorsCh(notifications, notification.NamespaceName, err) } } } } func (a *agollo) notifications() []Notification { var notifications []Notification a.notificationMapLock.Lock() for key, val := range a.notificationMap { notifications = append(notifications, Notification{ NamespaceName: key, NotificationID: val, }) } a.notificationMapLock.Unlock() return notifications } // 启动goroutine去轮训apollo通知接口 func (a *agollo) Start() <-chan *LongPollerError { a.runOnce.Do(func() { go func() { timer := time.NewTimer(a.opts.LongPollerInterval) defer timer.Stop() for !a.shouldStop() { select { case <-timer.C: a.longPoll() timer.Reset(a.opts.LongPollerInterval) case <-a.stopCh: return } } }() }) return a.errorsCh } func (a *agollo) Stop() { a.stopLock.Lock() defer a.stopLock.Unlock() if a.stop { return } a.stop = true close(a.stopCh) } func (a *agollo) shouldStop() bool { select { case <-a.stopCh: return true default: return false } } func Init(configServerURL, appID string, opts ...Option) (err error) { defaultAgollo, err = New(configServerURL, appID, opts...) return } func InitWithConfigFile(configFilePath string, opts ...Option) (err error) { defaultAgollo, err = NewWithConfigFile(configFilePath, opts...) return } func InitWithDefaultConfigFile(opts ...Option) error { return InitWithConfigFile(defaultConfigFilePath, opts...) } func Start() <-chan *LongPollerError { return defaultAgollo.Start() } func Stop() { defaultAgollo.Stop() } func Reload() error { return defaultAgollo.Reload() } func GetAll() map[string]interface{} { return defaultAgollo.GetAll() } func Get(key string, opts ...GetOption) string { return defaultAgollo.Get(key, opts...) } func GetNameSpace(namespace string) Configurations { return defaultAgollo.GetNameSpace(namespace) } func Watch() <-chan *ApolloResponse { return defaultAgollo.Watch() } func WatchNamespace(namespace string, stop chan bool) <-chan *ApolloResponse { return defaultAgollo.WatchNamespace(namespace, stop) } func GetAgollo() Agollo { return defaultAgollo }