package localcache import ( "context" "sync" "time" "git.shuncheng.lu/bigthing/gocommon/pkg/conf" "git.shuncheng.lu/bigthing/gocommon/pkg/logger" ) const ( defaultRefreshTime int64 = 600 defaultCacheName = "localcache" ) type Cache interface { Get(key interface{}) interface{} Put(key interface{}, value interface{}) Delete(key interface{}) KeySet() []interface{} Refresh() } // 用于刷新任务的cache,不能替换cache无效,比如cache=cache1,刷新任务最好recover一下,防止程序出问题 type Refresh func(context context.Context, cache Cache) type Option func(cache *localCache) type cacheEntry struct { // 这个name主要是用来设置ttl时间,直接读取config cacheName string // 普通map 并发读写会抛出 fatal error: concurrent map read and map write // 读多写少,使用sync-map cache sync.Map // 刷新缓存的脚本 refresh Refresh // 刷新时间 refreshTime time.Duration } // 对外暴漏的接口 func (this *cacheEntry) Put(key interface{}, value interface{}) { this.cache.Store(key, value) } func (this *cacheEntry) Delete(key interface{}) { this.cache.Delete(key) } func (this *cacheEntry) Get(key interface{}) interface{} { value, _ := this.cache.Load(key) return value } func (this *cacheEntry) KeySet() []interface{} { keys := make([]interface{}, 0) this.cache.Range(func(key, value interface{}) bool { keys = append(keys, key) return true }) return keys } type localCache struct { localCacheName string lock sync.Mutex entries map[string]*cacheEntry start bool defaultRefreshTime int64 //单位s } // 实例化 func NewLocalCache(option ...Option) *localCache { cache := new(localCache) for _, e := range option { if e != nil { e(cache) } } if cache.localCacheName == "" { cache.localCacheName = defaultCacheName } if cache.entries == nil { cache.entries = map[string]*cacheEntry{} } if cache.defaultRefreshTime == 0 { cache.defaultRefreshTime = defaultRefreshTime } cache.start = false return cache } // 获取数据,不返回bool,是因为业务中只需要判断nil func (this *localCache) Get(cacheName string, keyName interface{}) interface{} { entry, isExist := this.entries[cacheName] if !isExist || entry == nil { return nil } return entry.Get(keyName) } // 注册 func (this *localCache) Register(configName string, refresh Refresh) { this.lock.Lock() defer this.lock.Unlock() if this.start { return } cache, isExist := this.entries[configName] if isExist || cache != nil { return } entry := cacheEntry{ cacheName: configName, refresh: refresh, refreshTime: this.getRefreshTime(configName), } this.entries[configName] = &entry } func (this *localCache) Start() { this.lock.Lock() defer this.lock.Unlock() if this.start { return } // 同步初始化数据 for index, _ := range this.entries { this.entries[index].Refresh() } // goroutine 跑需要recover for _, elem := range this.entries { go func(job *cacheEntry) { timer := time.NewTimer(job.refreshTime) for { <-timer.C job.Refresh() job.refreshTime = this.getRefreshTime(job.cacheName) timer.Reset(job.refreshTime) } }(elem) } this.start = true } func (this *cacheEntry) Refresh() { defer func() { if p := recover(); p != nil { logger.Errorf("[local-cache] cache-job has panic,cache-name=%s,panic_info=%v", this.cacheName, p) } }() start := time.Now() ctx := logger.NewTraceIdContext() logger.Infoc(ctx, "[local-cache] %s start, ttl=%.0fs", this.cacheName, this.refreshTime.Seconds()) this.refresh(ctx, this) logger.Infoc(ctx, "[local-cache] %s end, ttl=%.0fs,refresh_spend=%fs", this.cacheName, this.refreshTime.Seconds(), time.Now().Sub(start).Seconds()) } func (this *localCache) getRefreshTime(cacheName string) time.Duration { return time.Duration(conf.GetInt64(this.localCacheName, cacheName, this.defaultRefreshTime)) * time.Second }