切換語言為:簡體

Go-Redsync包底層原始碼實現

  • 爱糖宝
  • 2024-06-27
  • 2079
  • 0
  • 0

背景

redsync包是一個基於 Redis 實現的分散式鎖的 Go 語言庫。它基於 Redlock 演算法,可以在多個 Redis 節點上實現一個具有容錯性的分散式鎖。

包內實現了,重試機制(獲取鎖),支援多個reids節點,分散式鎖,續租等功能。

github.com/go-redsync/…

Redlock (紅鎖)演算法:是在多個Redis節點之間實現鎖機制。它採用主節點過半機制,即獲取鎖或釋放鎖成功的標誌是在過半的節點上操作成功。例如,如果有5個Redis主節點,那麼至少需要在3個節點上成功獲取鎖纔算成功。這種機制有效地解決了單點失敗的問題,並提高了系統的穩定性和可靠性。

使用

下面是初始化,加鎖,釋放鎖使用例子:

package main

import (
        "context"
        "fmt"
        "log"
        "time"

        "github.com/go-redsync/redsync/v4"
        "github.com/go-redsync/redsync/v4/redis/goredis/v8"
        "github.com/go-redis/redis/v8"
)

func main() {
        // 建立 Redis 客戶端連線
        redisClient := redis.NewClient(&redis.Options{
                Addr: "localhost:6379", // Redis 服務地址
                DB:   0,                // 使用預設的資料庫編號
        })

        // 使用 Redsync 建立一個分散式鎖的例項
        pool := goredis.NewPool(redisClient) // 建立連線池
        rs := redsync.New(pool)

        // 定義鎖的名稱
        lockName := "my_distributed_lock"

        // 建立一個鎖物件
        lock := rs.NewMutex(lockName, redsync.WithTTL(10*time.Second))

        // 嘗試獲取鎖
        if err := lock.Lock(); err != nil {
                log.Fatal(err)
        }

        fmt.Println("Lock acquired")

        // 執行業務邏輯
        // ...

        // 模擬業務邏輯處理時間
        time.Sleep(5 * time.Second)

        // 完成業務邏輯後釋放鎖
        if err := lock.Unlock(); err != nil {
                log.Fatal(err)
        }

        fmt.Println("Lock released")
}


看看它的原始碼實現

實現

分散式鎖的實現原理是採用redis的setnx +過期時間來實現的;

運用Redlock演算法來解決多個redis,獲取釋放鎖成功是否的判斷依據;

初始化

透過Mutex結構來構建實現流程的引數配置,下面可以看見Mutex配置的含義,可以看到鎖的過期時間預設為8秒

func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
    m := &Mutex{
       name:   name,
       expiry: 8 * time.Second,
       tries:  32,
       delayFunc: func(tries int) time.Duration {
          return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
       },
       genValueFunc:  genValue,
       driftFactor:   0.01,
       timeoutFactor: 0.05,
       quorum:        len(r.pools)/2 + 1,
       pools:         r.pools,
    }
    for _, o := range options {
       o.Apply(m)
    }
    if m.shuffle {
       randomPools(m.pools)
    }
    return m
}

type Mutex struct {
        // name 是鎖的名稱,用於標識不同的鎖例項。
        name string
        // expiry 是鎖的過期時間,即鎖在被自動釋放之前的有效時間。
        expiry time.Duration
        // tries 是嘗試獲取鎖的次數,用於重試機制。
        tries int
        // delayFunc 是延遲函式,用於在嘗試獲取鎖之間等待一段時間。
        delayFunc DelayFunc
        // driftFactor 是時鐘漂移因子,用於計算鎖的最小有效時間。
        driftFactor float64
        // timeoutFactor 是超時因子,用於計算在嘗試獲取鎖時的最大等待時間。
        timeoutFactor float64
        // quorum 是最小鎖數量,即在多個 Redis 例項中需要成功獲取鎖的最小數量。
        quorum int
        // genValueFunc 是生成值的函式,用於生成唯一的鎖值。
        //每一次加鎖都會生成一個隨機數,加鎖成功會附值到value上,
        //當釋放鎖的時候會對值進行對比,防止誤刪的情況
        genValueFunc func() (string, error)
        //每次加鎖成功的唯一值
        value string
        // until 是鎖的有效期截止時間。
        until time.Time
        // shuffle 表示是否在嘗試獲取鎖之前對 Redis 例項進行隨機排序。
        shuffle bool
        // failFast 表示是否在第一次獲取鎖失敗時就立即返回錯誤。
        failFast bool
        // setNXOnExtend 表示在擴充套件鎖的過期時間時是否使用 SETNX 命令。
        setNXOnExtend bool
        // pools 是 Redis 連線池的切片,用於儲存與多個 Redis 例項的連線。
        pools []redis.Pool
}


加鎖

透過下面TryLock()方法去嘗試1次加鎖或透過Lock()方法去使用配置的重試次數去加鎖:

// TryLock only attempts to lock m once and returns immediately regardless of success or failure without retrying.
func (m *Mutex) TryLock() error {
    return m.TryLockContext(context.Background())
}

// TryLockContext only attempts to lock m once and returns immediately regardless of success or failure without retrying.
func (m *Mutex) TryLockContext(ctx context.Context) error {
    return m.lockContext(ctx, 1)
}

// Lock locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
func (m *Mutex) Lock() error {
    return m.LockContext(context.Background())
}

// LockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
func (m *Mutex) LockContext(ctx context.Context) error {
    return m.lockContext(ctx, m.tries)


不管是那種方法,核心的方法流程是在lockContext方法裡處理:

// lockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
func (m *Mutex) lockContext(ctx context.Context, tries int) error {
    if ctx == nil {
       ctx = context.Background()
    }

    value, err := m.genValueFunc()
    if err != nil {
       return err
    }

    var timer *time.Timer
    for i := 0; i < tries; i++ {
       if i != 0 {
          if timer == nil {
             timer = time.NewTimer(m.delayFunc(i))
          } else {
             timer.Reset(m.delayFunc(i))
          }
          
          //防止要防止頻繁重試,讓重試擁有一個延期時間
          select {
          case <-ctx.Done():
             timer.Stop()
             // Exit early if the context is done.
             return ErrFailed
          case <-timer.C:
             // Fall-through when the delay timer completes.
          }
       }

       start := time.Now()

       //多個reids進行加鎖
       n, err := func() (int, error) {
          ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
          defer cancel()
          return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
             return m.acquire(ctx, pool, value)
          })
       }()

       //判斷加鎖成功節點是否大於一半節點數並且是否超過鎖的過期時間
       now := time.Now()
       until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
       if n >= m.quorum && now.Before(until) {
          m.value = value
          m.until = until
          return nil
       }
       
       //加鎖失敗,需要把每個redis中加鎖成功的節點釋放鎖
       _, _ = func() (int, error) {
          ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
          defer cancel()
          return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
             return m.release(ctx, pool, value)
          })
       }()
       if i == m.tries-1 && err != nil {
          return err
       }
    }

    return ErrFailed
}


實現流程:

  1. 檢查上下文:首先檢查傳入的 context.Context 是否為 nil,如果是,則使用一個空的背景上下文 context.Background()

  2. 生成鎖的值:呼叫 m.genValueFunc() 生成一個唯一的鎖值,這個值用於在多個 Redis 例項上設定鎖。

  3. 重試迴圈:進入一個迴圈,嘗試 tries 次獲取鎖。如果在第一次之外的重試,會根據重試次數 i 使用 m.delayFunc(i) 計算延遲時間,並等待這個延遲時間。

  4. 上下文超時檢查:在每次重試之前,檢查上下文是否已取消(ctx.Done()),如果是,則停止計時器並返回錯誤。

  5. 執行鎖操作:呼叫一個匿名函式,該函式在超時上下文中嘗試在所有 Redis 例項上非同步執行鎖操作。這個操作是透過 m.actOnPoolsAsync() 方法實現的,它內部會呼叫 m.acquire() 方法嘗試獲取鎖。

  6. 計算鎖的有效期:如果成功在大多數 Redis 例項上獲取到鎖,並且從開始嘗試獲取鎖到當前時間的時間差加上時鐘漂移因子計算出的有效期還沒過期,則認為獲取鎖成功。

  7. 設定鎖的值和有效期:如果鎖獲取成功,設定 Mutex 結構體的 valueuntil 欄位。

  8. 釋放鎖:如果在嘗試過程中鎖獲取失敗,會呼叫 m.release() 方法嘗試釋放所有 Redis 例項上的鎖。

  9. 錯誤處理:如果在最後一次嘗試後仍然失敗,並且發生了錯誤,則返回這個錯誤。

在第5步執行鎖的操作的程式碼流程:

func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {
    type result struct {
       node     int
       statusOK bool
       err      error
    }

    ch := make(chan result, len(m.pools))
    for node, pool := range m.pools {
       go func(node int, pool redis.Pool) {
          r := result{node: node}
          //加鎖操作
          r.statusOK, r.err = actFn(pool)
          ch <- r
       }(node, pool)
    }

    var (
       n     = 0
       taken []int
       err   error
    )

    for range m.pools {
       r := <-ch
       if r.statusOK {
          n++
       } else if r.err == ErrLockAlreadyExpired {
          err = multierror.Append(err, ErrLockAlreadyExpired)
       } else if r.err != nil {
          err = multierror.Append(err, &RedisError{Node: r.node, Err: r.err})
       } else {
          taken = append(taken, r.node)
          err = multierror.Append(err, &ErrNodeTaken{Node: r.node})
       }

       if m.failFast {
          // fast retrun
          if n >= m.quorum {
             return n, err
          }

          // fail fast
          if len(taken) >= m.quorum {
             return n, &ErrTaken{Nodes: taken}
          }
       }
    }

    if len(taken) >= m.quorum {
       return n, &ErrTaken{Nodes: taken}
    }
    return n, err
}


它會去在多個redis節加鎖,加鎖成功n++,失敗疊加錯誤資訊

這個acquire(xxx)方法,我們就熟悉了,使用SetNX命令去redis設定值,加了過期時間,防止死鎖。

func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
    conn, err := pool.Get(ctx)
    if err != nil {
       return false, err
    }
    defer conn.Close()
    reply, err := conn.SetNX(m.name, value, m.expiry)
    if err != nil {
       return false, err
    }
    return reply, nil
}


釋放鎖

釋放鎖是跟上面加鎖中流程中的釋放鎖呼叫的是同一個方法actOnPoolsAsync,方法的內容如下:

func (m *Mutex) UnlockContext(ctx context.Context) (bool, error) {
    n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
       return m.release(ctx, pool, m.value)
    })
    if n < m.quorum {
       return false, err
    }
    return true, nil
}


我們來看看release是怎麼操作redis的?

func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
    conn, err := pool.Get(ctx)
    if err != nil {
       return false, err
    }
    defer conn.Close()
    status, err := conn.Eval(deleteScript, m.name, value)
    if err != nil {
       return false, err
    }
    if status == int64(-1) {
       return false, ErrLockAlreadyExpired
    }
    return status != int64(0), nil
}

var deleteScript = redis.NewScript(1, `
    local val = redis.call("GET", KEYS[1])
    if val == ARGV[1] then
       return redis.call("DEL", KEYS[1])
    elseif val == false then
       return -1
    else
       return 0
    end
`)


它是去執行的一個lua命令,它會去獲取key對應的值,並判斷值是否跟當前值相等,相等的話就刪除key

當透過key的value值的判斷來確保誤刪的情況,那什麼時候會出現誤刪的情況呢?如圖

Go-Redsync包底層原始碼實現

當程序a加鎖成功後,執行的業務程式碼過久,在a釋放鎖之前,鎖就過期了,如果這個時候程序b去加鎖,是會加鎖成功的,如果a再去釋放鎖就會錯誤的釋放掉b加的鎖,出現了誤刪的情況。 爲了防止誤刪的情況,會在刪除鎖之前進行一個value值的判斷,確保每次加鎖釋放鎖成功都是一個程序的操作。

續租鎖

當我們不能預估業務執行的完成時間,那這個時候我們設定鎖的超時時間就不好預估,這個時候,續租鎖就可以解決這個問題。

封裝續約方法,當我們在獲取到鎖後,就非同步執行我們的業務程式碼,並且定時去執行redsync包提供的ExtendContext方法進行續約

//LockWithTTL 鎖的key  ttl 過期時間 handle:業務函式
func LockWithTTL(ctx context.Context, key string, ttl time.Duration,
    handle func(ctx context.Context)) error {

    if ttl < 500*time.Millisecond {
       return errors.New("ttl time less than 500 Millsecond")
    }

    if Redis == nil {
       return errors.New("please init redis client")
    }

    //可以初始化多個redis
    locker := redsync.New(goredis.NewPool(Redis))
    mutex := locker.NewMutex(key, redsync.WithTries(1), redsync.WithExpiry(ttl))
    if err := mutex.LockContext(ctx); err != nil {
       return err
    }

    defer func() {
       if _, err := mutex.UnlockContext(ctx); err != nil {
          logrus.WithError(err).Errorln()
       }
    }()

    //業務函式結束chan
    waitChain := make(chan struct{})
    
    //續約定時頻率預設為500毫秒 
    checkDuration := 500 * time.Millisecond
    ticker := time.NewTicker(checkDuration)
    cancelCtx, cancel := context.WithCancel(ctx)

    go func(waitChain chan<- struct{}, fn func(ctx context.Context)) {
        //執行我們的業務程式
       fn(cancelCtx)
       
       //結束續約
       close(waitChain)
       ticker.Stop()
    }(waitChain, handle)

    for {
       select {
       case <-ticker.C:
         //到期時間-當前時間 如果小於等於了續約的定時頻率,那麼當前週期就去續約
          diff := mutex.Until().Sub(time.Now())
          if diff <= checkDuration {
              //redsync提供的續約方法
             if _, err := mutex.ExtendContext(cancelCtx); err != nil {
                logrus.WithError(err).Errorln()
                cancel()
                return err
             }
          }
       case <-waitChain:
          ticker.Stop()
          return nil
       case <-cancelCtx.Done():
          return nil
       }
    }
}


我們看看ExtendContext方法是怎麼實現的?

func (m *Mutex) ExtendContext(ctx context.Context) (bool, error) {
    start := time.Now()
    n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
       return m.touch(ctx, pool, m.value, int(m.expiry/time.Millisecond))
    })
    if n < m.quorum {
       return false, err
    }
    now := time.Now()
    until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
    if now.Before(until) {
       m.until = until
       return true, nil
    }
    return false, ErrExtendFailed
}

func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) {
    conn, err := pool.Get(ctx)
    if err != nil {
       return false, err
    }
    defer conn.Close()

    touchScript := touchScript
    if m.setNXOnExtend {
       touchScript = touchWithSetNXScript
    }

    status, err := conn.Eval(touchScript, m.name, value, expiry)
    if err != nil {
       // extend failed: clean up locks
       _, _ = func() (int, error) {
          ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
          defer cancel()
          return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
             return m.release(ctx, pool, value)
          })
       }()
       return false, err
    }
    return status != int64(0), nil
}

//最後在redis執行的lua指令碼
var touchWithSetNXScript = redis.NewScript(1, `
    if redis.call("GET", KEYS[1]) == ARGV[1] then
       return redis.call("PEXPIRE", KEYS[1], ARGV[2])
    elseif redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "NX") then
       return 1
    else
       return 0
    end
`)

//最後在redis執行的lua指令碼
var touchScript = redis.NewScript(1, `
    if redis.call("GET", KEYS[1]) == ARGV[1] then
       return redis.call("PEXPIRE", KEYS[1], ARGV[2])
    else
       return 0
    end
`)


  1. 透過actOnPoolsAsync方式去每個redis執行touch方法,返回執行成功的次數和err;

  2. touch方法會根據setNXOnExtend 引數是否設定去執行對應的lua指令碼,setNXOnExtend為false執行touchScript指令碼為true執行touchWithSetNXScript

  3. touchScript作用:如果獲取的value值跟當前的值一樣,就去更新過期時間。

  4. touchWithSetNXScript作用:獲取的value值跟當前的值一樣,就去更新過期時間,如果獲取的value值跟當前的值不一樣,會去設定key的值並且初始化過期時間,類似update或者insert的作用;

總結

  1. redsync包使用redlcoak演算法解決的多個redis的分散式鎖;

  2. 刪除或者續約採用隨機數的判斷,實現了誤刪或者誤續的情況;

  3. 增加重試機制,提高獲取鎖的效能;

  4. 透過續租方法,解決長時間執行的任務,導致鎖過期的情況;

參考

【Redisson–紅鎖(Redlock)–使用/原理】cloud.tencent.com

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.