| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- package common_db
- import (
- "context"
- _ "database/sql"
- "reflect"
- "sync/atomic"
- "time"
- "git.shuncheng.lu/bigthing/gocommon/pkg/trace"
- "git.shuncheng.lu/bigthing/gocommon/pkg/conf"
- "git.shuncheng.lu/bigthing/gocommon/pkg/internal/constant"
- . "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
- _ "github.com/go-sql-driver/mysql"
- "xorm.io/xorm"
- )
- type DbConn interface {
- Close() error
- DbName() string
- NewSlaveEngine() *xorm.Engine
- NewMasterEngine() *xorm.Engine
- NewSlaveSession() *xorm.Session
- NewMasterSession() *xorm.Session
- NewSlaveCtxSession(ctx context.Context) *xorm.Session
- NewMasterCtxSession(ctx context.Context) *xorm.Session
- }
- const (
- defaultPingConnection = 30
- defaultRefreshConfigTime = 30
- )
- type dbConn struct {
- dbName string
- realDbName string
- config map[string]string
- /**
- // 原子包的速度
- BenchmarkName-8 1000000000 0.817 ns/op
- // 速写锁的速度
- BenchmarkName-8 28494505 39.3 ns/op
- // 完全不是一个量级
- */
- engineGroup atomic.Value
- }
- func (d *dbConn) getConfig(key string) string {
- return d.config[key]
- }
- func (d *dbConn) setConfig(conf map[string]string) {
- d.config = conf
- }
- func (d *dbConn) getEngineGroup() *xorm.EngineGroup {
- group := d.engineGroup.Load()
- if group == nil {
- return nil
- }
- return group.(*xorm.EngineGroup)
- }
- func (d *dbConn) setEngineGroup(group *xorm.EngineGroup) {
- d.engineGroup.Store(group)
- }
- //主库从库engine group
- func NewDbConn(dbName string) (DbConn, error) {
- conn := &dbConn{
- dbName: dbName,
- }
- err := conn.reloadEngine()
- if err != nil {
- return nil, err
- }
- // 热加载
- GoWithRecover(func() {
- conn.checkRefreshConfig()
- }, func(error interface{}) {
- Errorf("[database.checkConn] run panic find err, err=%v", err)
- })
- // 健康检测 v1.0.2.3 取消,go std driver的feature支持 自动处理断开的连接, https://github.com/go-sql-driver/mysql#features
- //go func() {
- // conn.checkConn()
- //}()
- return conn, nil
- }
- func (d *dbConn) NewSlaveEngine() *xorm.Engine {
- return d.getEngineGroup().Slave()
- }
- func (d *dbConn) NewMasterEngine() *xorm.Engine {
- return d.getEngineGroup().Master()
- }
- func (d *dbConn) NewSlaveSession() *xorm.Session {
- return d.NewSlaveEngine().NewSession()
- }
- func (d *dbConn) NewMasterSession() *xorm.Session {
- return d.NewMasterEngine().NewSession()
- }
- func (d *dbConn) NewSlaveCtxSession(ctx context.Context) *xorm.Session {
- ctx = d.withContext(ctx)
- return d.NewSlaveEngine().Context(ctx)
- }
- func (d *dbConn) NewMasterCtxSession(ctx context.Context) *xorm.Session {
- ctx = d.withContext(ctx)
- return d.NewMasterEngine().Context(ctx)
- }
- func (d *dbConn) withContext(ctx context.Context) context.Context {
- if ctx == nil {
- ctx = context.Background()
- }
- ctx = context.WithValue(ctx, constant.DbName, d.dbName)
- span, err := trace.NewMysqlExitSpan(ctx, d.dbName)
- if span != nil && err == nil {
- ctx = trace.SetMySqlExitSpan(ctx, span)
- }
- return ctx
- }
- func (d *dbConn) DbName() string {
- return d.realDbName
- }
- func (d *dbConn) Close() error {
- return closeEngine(d.getEngineGroup())
- }
- /**
- 定期检测!健康!!
- */
- //func (d *dbConn) checkConn() {
- // getRefreshTime := func() time.Duration {
- // return time.Second * time.Duration(String2Int64(d.getConfig("ping_connection"), defaultPingConnection))
- // }
- // timer := time.NewTimer(getRefreshTime())
- // for {
- // <-timer.C
- // func() {
- // defer func() {
- // if err := recover(); err != nil {
- // Errorf("[database.checkConn] find panic,err=%v", err)
- // }
- // }()
- // serr := d.NewSlaveSession().Ping()
- // merr := d.NewMasterSession().Ping()
- // if serr != nil || merr != nil {
- // Errorf("[database.checkConn] ping err,err=%v", serr, merr)
- // oldEngine := d.getEngineGroup()
- // err := d.reloadEngine()
- // if err != nil {
- // Errorf("[database.checkConn] reload err,err=%v", err)
- // return
- // }
- // go func() {
- // err := closeEngine(oldEngine) // 异步关闭
- // if err != nil {
- // Errorf("[database.checkConn] close err,err=%v", err)
- // }
- // return
- // }()
- // }
- // }()
- // timer.Reset(getRefreshTime())
- // }
- //}
- /**
- 定期刷新配置,查看db是否变更!!
- */
- func (d *dbConn) checkRefreshConfig() {
- getRefreshTime := func() time.Duration {
- return time.Second * time.Duration(String2Int64(d.getConfig("refresh_config_time"), defaultRefreshConfigTime))
- }
- timer := time.NewTimer(getRefreshTime())
- for {
- <-timer.C
- func() {
- defer func() {
- if err := recover(); err != nil {
- Errorf("[database.checkRefreshConfig] panic,err=%v", err)
- }
- }()
- newConf, err := getMysqlConfig(d.dbName, conf.SetAndAssertNil)
- if err != nil {
- Errorf("[database.checkRefreshConfig] getMysqlConfig err,err=%v", err)
- return
- }
- isSame := reflect.DeepEqual(d.config, newConf)
- if !isSame {
- oldEngine := d.getEngineGroup()
- if err := d.reloadEngine(); err != nil {
- Errorf("[database.checkRefreshConfig] reload err,err=%v", err)
- return
- }
- go func(oldEngine *xorm.EngineGroup) {
- defer func() {
- if err := recover(); err != nil {
- Errorf("[database.checkRefreshConfig] close engine panic, err=%v", err)
- }
- }()
- err := closeEngine(oldEngine) // 异步关闭
- if err != nil {
- Errorf("[database.checkConn] close err, err=%v", err)
- }
- }(oldEngine)
- }
- }()
- timer.Reset(getRefreshTime())
- }
- }
- func (d *dbConn) reloadEngine() error {
- group, config, err := newEngine(d.dbName)
- if err != nil {
- return err
- }
- if err := testConn(group); err != nil {
- return err
- }
- d.realDbName = If(config["dbname"] == "", d.dbName, config["dbname"]).(string)
- d.setConfig(config)
- d.setEngineGroup(group)
- return nil
- }
- /**
- 测试 engine 连接, 创建 Engine时候需要测试连接
- */
- func testConn(engine *xorm.EngineGroup) error {
- if engine == nil {
- return NewError("the engine group is nil can not test")
- }
- if engine.Slave() == nil {
- return NewError("the slave engine is nil can not test")
- }
- if err := engine.Slave().Ping(); err != nil {
- return NewError("slave engine ping find err, err: %s", err)
- }
- if engine.Master() == nil {
- return NewError("the master engine is nil can not test")
- }
- if err := engine.Master().Ping(); err != nil {
- return NewError("master engine ping find err, err: %s", err)
- }
- return nil
- }
|