package common_db import ( "context" _ "database/sql" "reflect" "sync/atomic" "time" "git.shuncheng.lu/bigthing/gocommon/pkg/trace" "git.shuncheng.lu/bigthing/gocommon/pkg/conf" "git.shuncheng.lu/bigthing/gocommon/pkg/internal/constant" . "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util" _ "github.com/go-sql-driver/mysql" "xorm.io/xorm" ) type DbConn interface { Close() error DbName() string NewSlaveEngine() *xorm.Engine NewMasterEngine() *xorm.Engine NewSlaveSession() *xorm.Session NewMasterSession() *xorm.Session NewSlaveCtxSession(ctx context.Context) *xorm.Session NewMasterCtxSession(ctx context.Context) *xorm.Session } const ( defaultPingConnection = 30 defaultRefreshConfigTime = 30 ) type dbConn struct { dbName string realDbName string config map[string]string /** // 原子包的速度 BenchmarkName-8 1000000000 0.817 ns/op // 速写锁的速度 BenchmarkName-8 28494505 39.3 ns/op // 完全不是一个量级 */ engineGroup atomic.Value } func (d *dbConn) getConfig(key string) string { return d.config[key] } func (d *dbConn) setConfig(conf map[string]string) { d.config = conf } func (d *dbConn) getEngineGroup() *xorm.EngineGroup { group := d.engineGroup.Load() if group == nil { return nil } return group.(*xorm.EngineGroup) } func (d *dbConn) setEngineGroup(group *xorm.EngineGroup) { d.engineGroup.Store(group) } //主库从库engine group func NewDbConn(dbName string) (DbConn, error) { conn := &dbConn{ dbName: dbName, } err := conn.reloadEngine() if err != nil { return nil, err } // 热加载 GoWithRecover(func() { conn.checkRefreshConfig() }, func(error interface{}) { Errorf("[database.checkConn] run panic find err, err=%v", err) }) // 健康检测 v1.0.2.3 取消,go std driver的feature支持 自动处理断开的连接, https://github.com/go-sql-driver/mysql#features //go func() { // conn.checkConn() //}() return conn, nil } func (d *dbConn) NewSlaveEngine() *xorm.Engine { return d.getEngineGroup().Slave() } func (d *dbConn) NewMasterEngine() *xorm.Engine { return d.getEngineGroup().Master() } func (d *dbConn) NewSlaveSession() *xorm.Session { return d.NewSlaveEngine().NewSession() } func (d *dbConn) NewMasterSession() *xorm.Session { return d.NewMasterEngine().NewSession() } func (d *dbConn) NewSlaveCtxSession(ctx context.Context) *xorm.Session { ctx = d.withContext(ctx) return d.NewSlaveEngine().Context(ctx) } func (d *dbConn) NewMasterCtxSession(ctx context.Context) *xorm.Session { ctx = d.withContext(ctx) return d.NewMasterEngine().Context(ctx) } func (d *dbConn) withContext(ctx context.Context) context.Context { if ctx == nil { ctx = context.Background() } ctx = context.WithValue(ctx, constant.DbName, d.dbName) span, err := trace.NewMysqlExitSpan(ctx, d.dbName) if span != nil && err == nil { ctx = trace.SetMySqlExitSpan(ctx, span) } return ctx } func (d *dbConn) DbName() string { return d.realDbName } func (d *dbConn) Close() error { return closeEngine(d.getEngineGroup()) } /** 定期检测!健康!! */ //func (d *dbConn) checkConn() { // getRefreshTime := func() time.Duration { // return time.Second * time.Duration(String2Int64(d.getConfig("ping_connection"), defaultPingConnection)) // } // timer := time.NewTimer(getRefreshTime()) // for { // <-timer.C // func() { // defer func() { // if err := recover(); err != nil { // Errorf("[database.checkConn] find panic,err=%v", err) // } // }() // serr := d.NewSlaveSession().Ping() // merr := d.NewMasterSession().Ping() // if serr != nil || merr != nil { // Errorf("[database.checkConn] ping err,err=%v", serr, merr) // oldEngine := d.getEngineGroup() // err := d.reloadEngine() // if err != nil { // Errorf("[database.checkConn] reload err,err=%v", err) // return // } // go func() { // err := closeEngine(oldEngine) // 异步关闭 // if err != nil { // Errorf("[database.checkConn] close err,err=%v", err) // } // return // }() // } // }() // timer.Reset(getRefreshTime()) // } //} /** 定期刷新配置,查看db是否变更!! */ func (d *dbConn) checkRefreshConfig() { getRefreshTime := func() time.Duration { return time.Second * time.Duration(String2Int64(d.getConfig("refresh_config_time"), defaultRefreshConfigTime)) } timer := time.NewTimer(getRefreshTime()) for { <-timer.C func() { defer func() { if err := recover(); err != nil { Errorf("[database.checkRefreshConfig] panic,err=%v", err) } }() newConf, err := getMysqlConfig(d.dbName, conf.SetAndAssertNil) if err != nil { Errorf("[database.checkRefreshConfig] getMysqlConfig err,err=%v", err) return } isSame := reflect.DeepEqual(d.config, newConf) if !isSame { oldEngine := d.getEngineGroup() if err := d.reloadEngine(); err != nil { Errorf("[database.checkRefreshConfig] reload err,err=%v", err) return } go func(oldEngine *xorm.EngineGroup) { defer func() { if err := recover(); err != nil { Errorf("[database.checkRefreshConfig] close engine panic, err=%v", err) } }() err := closeEngine(oldEngine) // 异步关闭 if err != nil { Errorf("[database.checkConn] close err, err=%v", err) } }(oldEngine) } }() timer.Reset(getRefreshTime()) } } func (d *dbConn) reloadEngine() error { group, config, err := newEngine(d.dbName) if err != nil { return err } if err := testConn(group); err != nil { return err } d.realDbName = If(config["dbname"] == "", d.dbName, config["dbname"]).(string) d.setConfig(config) d.setEngineGroup(group) return nil } /** 测试 engine 连接, 创建 Engine时候需要测试连接 */ func testConn(engine *xorm.EngineGroup) error { if engine == nil { return NewError("the engine group is nil can not test") } if engine.Slave() == nil { return NewError("the slave engine is nil can not test") } if err := engine.Slave().Ping(); err != nil { return NewError("slave engine ping find err, err: %s", err) } if engine.Master() == nil { return NewError("the master engine is nil can not test") } if err := engine.Master().Ping(); err != nil { return NewError("master engine ping find err, err: %s", err) } return nil }