2020年11月16日

[go-pkg] context package

此篇為各筆記之整理,非原創內容,資料來源可見下方連結與文後參考資料:
context package 最重要的就是處理多個 goroutine 的情況,特別是用來送出取消或結束的 signal。
我們可以在一個 goroutine 中建立 Context 物件後,傳入另一個 goroutine;另一個 goroutine 即可以透過 Done() 來從該 context 中取得 signal,一旦這個 Done channel 關閉之後,這個 goroutine 即會關閉並 return。
Context 也可以是受時間控制,它也可以在特定時間後關閉該 signal channel,我們可以定義一個 deadline 或 timeout 的時間,時間到了之後,Context 物件就會關閉該 signal channel。
更好的是,一旦父層的 Context 關閉其 Done channel 之後,子層的 Done channel 則會自動關閉。

重要概念

  • 不要把 Context 保存在 struct 中,而是直接當作第一個參數傳入 function 或 goroutine 中,通常會命名為 ctx
  • server 在處理傳進來的請求時應該要建立一個 Context,而使用該 server 的方法則應該要接收 Context 作為參數
  • 雖然函式可以允許傳入 nil Context,但千萬不要這麼做,如果你不確定要用哪個 Context,可以使用 context.TODO
  • 只在 request-scoped data 這種要交換處理資料或 API 的範疇下使用 context Values,不要傳入 optional parameters 到函式中。
  • 相同的 Context 可以傳入多個不同的 goroutine 中使用,在多個 goroutines 中同時使用 Context 是安全的(safe)
func DoSomething(ctx context.Context, arg Arg) error {
	// ... use ctx ...
}

context.Background()

context.Background() 會回傳一個不是 nilempty Context這個 Context 絕不會被取消(canceled)、不會有值、也不會有 deadline。這通常會用在 main function、初始化(initialization)或測試中使用,可以作為處理請求時最高層的 Context(top-level Context)。

context.TODO()

context.TODO() 會回傳一個不是 nil 的 empty Context。它通常會使用在還不清楚要使用哪個 Context 時,或還無法取得 Context 的情況下使用。

context.WithCancel()

context.WithCancel() 函式會回傳 Context 物件和 CancelFunction。這個 Context 的 Done channel 會在 cancel function 被呼叫到時關閉,或是父層的 Done channel 關閉時亦會關閉。
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
  • 重複呼叫 cancel() 不會有任何效果
  • context 建議當成函式或 goroutine 的參數傳入,並且命名為 ctx,並不建議把它保存在 struct 中
  • Context 可以有父子層的關係,也就是一個 Context 可以產生另一個 Context,但一旦父層 Context 取消/關閉時,所有根據這個 Context 所產生的 Context 也會一併關閉
// https://medium.com/rungo/understanding-the-context-package-b2e407a9cdae
func square(ctx context.Context, c chan int) {
	i := 0
	for {
		select {
		case <-ctx.Done(): // STEP 2:監聽 context Done
			return // kill goroutine
		case c <- i * i:
			i++
		}
	}
}

func main() {
	c := make(chan int)

	// STEP 1:建立可以被 cancel 的 context
	ctx, cancel := context.WithCancel(context.Background())

	go square(ctx, c)

	for i := 0; i < 5; i++ {
		fmt.Println("Next square is", <-c)
	}

	// STEP 3:當所有訊息都從 channel 取出後,使用 cancel 把 square 這個 goroutine 關閉
	cancel()

	time.Sleep(3 * time.Second)

	fmt.Println("Number of active goroutines", runtime.NumGoroutine())
}
範例:
// code modified from appleboy
// https://blog.wu-boy.com/2020/08/three-ways-to-manage-concurrency-in-go/
func startProcessA(ctx context.Context, name string) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println(name, "Exit")
			return
		case <-time.After(1 * time.Second):
			fmt.Println(name, "keep doing something")
		}
	}
}

func main() {
	// 使用 context.WithCancel 取得 ctx 和 cancel
	ctx, cancel := context.WithCancel(context.Background())
	go startProcessA(ctx, "Process A") // 執行 goroutine 並把 context 傳入
	time.Sleep(5 * time.Second)
	fmt.Println("client release connection, need to notify Process A and exit")
	cancel() // 呼叫 cancel 方法
	fmt.Println("Process finish")
}

context.WithDeadline()

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
  • context.WithDeadline() 中可以指定一個時間(time.Time)當作 deadline,一旦時間到時,就會自動觸發 cancel
  • context.WithDeadline() 同樣會回傳 cancel,因此也可以主動呼叫 cancel
  • 如果父層的 context 被 cancel 的話,子層的 context 也會一併被 cancel
var startTime = time.Now()

func worker(ctx context.Context, durationSecs int) {
	select {
	// STEP 3:deadline 時間到時或主動呼叫 cancel 時,都會進入 ctx.Done()
	case <-ctx.Done():
		fmt.Printf("%0.2fs - worker(%ds) killed!\n", time.Since(startTime).Seconds(), durationSecs)
		return // kills goroutine

	// 模擬做事所需花費的時間
	case <-time.After(time.Duration(durationSecs) * time.Second):
		fmt.Printf("%0.2fs - worker(%ds) completed the job.\n", time.Since(startTime).Seconds(), durationSecs)
	}
}

func main() {
	// STEP 1:建立 deadline
	deadline := time.Now().Add(3 * time.Second)

	// STEP 2:將 deadline 傳入並取得 cancel
	ctx, cancel := context.WithDeadline(context.Background(), deadline)

	// STEP 4:如果 main 比其他 goroutine 提早結束時,呼叫 cancel 讓其他 goroutine 結束
	defer cancel()

	go worker(ctx, 2)
	go worker(ctx, 3)
	go worker(ctx, 4)
	go worker(ctx, 6)
	fmt.Println("Number of active goroutines", runtime.NumGoroutine())

	time.Sleep(5 * time.Second)

	fmt.Println("Number of active goroutines", runtime.NumGoroutine())
}

context.WithTimeout()

超過一定的時間後就會停止該 function
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
  • context.WithTimeout() 的用法和 context.WithDeadline() 幾乎相同,差別只在於 WithTimeout() 帶入的參數是時間區間(time.Duration
  • 實際上,WithTimeout() 的底層仍然是呼叫 WithDeadline(),只是它會幫忙做掉 time.Add() 的動作
func printFeature(client pb.RouteGuideClient, point *pb.Point) {
  // 透過 context.WithTimeout 取得 ctx 和 cancel
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

  // 把可能會花許多時間的方法帶入 ctx
	feature, err := client.GetFeature(ctx, point)

	if err != nil {
		log.Fatalf("%v.GetFeature(_) = _, %v: ", client, err)
	}
	log.Println(feature)
}

context.WithValue()

func (oAuth *OAuth) GetClient(certPath, keyPath string) (*http.Client, error) {
	sslcli, err := addTLSCertificate(certPath, keyPath)
	if err != nil {
		return nil, fmt.Errorf("add tls certificate %v", err)
	}

	ctx := context.TODO()
	ctx = context.WithValue(ctx, oauth2.HTTPClient, sslcli)
	client := oAuth.Config.Client(ctx)

	return client, nil
}

參考文章

[go-pkg] time/rate package

golang

重要概念

在 Golang 中使用 Limiter 來控制在特定頻率內,某個事件是否允許被執行。這個 Limiter 是實作 Token Bucket 的方式來達到限流的目的,也就是會先設定:
  • event rate(r)將 token 放入桶子的頻率,例如每秒將放入 n 個 token 到桶子(bucket)中。
  • burst size(b)一個桶子(bucket)中能夠容納的 token 數量
一開始桶子會是滿的,只要桶子中有剩餘的 Token 就可以取用,若沒有剩餘的 Token 則需要等待後才能取用。

建立 Limiter:NewLimiter

keywords: NewLimiter
使用 NewLimiter 來建立一個 non-zero Limiter:
func NewLimiter(r Limit, b int) *Limiter
Limiter 包含兩個主要的屬性:
  • r:rate,型別是 LimitLimit 的型別是 float64), 是用來定義**「每秒」內某事件可以發生的次數**,zero 的話表示不允許任何事件發生。可以透過 Every(interval time.Duration) Limit 這個方法來取得 Limit。
  • b:burst size,表示桶子的大小,也就是桶子中可以放入多少 Token
// r:rate,每秒會放入 10 個 token
// b:burst size,桶子的大小只能容納 1 個 token
limiter := rate.NewLimiter(10, 1)

fmt.Println(limiter.Limit(), limiter.Burst()) // 10, 1
也可使用 Every() 來產生 Limit
// func Every(interval time.Duration) Limit
//
// r:每 100 毫秒會放入 1 個 token(同樣也是每秒會有 10 個 token)
// b:桶子的大小只能容納 1 個 token
limit := rate.Every(100 * time.Millisecond)
limiter := rate.NewLimiter(limit, 1)

fmt.Println(limiter.Limit(), limiter.Burst()) // 10, 1

使用 Limiter

keywords: Allow, Reserve, Wait, AllowN, ReserveN, WaitN
Limiter 主要有三種方法,分別是 Allow, ReserveWait一般來說最常使用到的是 Wait。這三種方法都需要消耗「一個」 token,差別在於當 token 不足的時候所採取的行為
當 Token 不足時:
  • Allow:會回傳 false
  • Reserve:會回傳 Reservation,表示預約未來的 Token 並告知要等多久後才能再次使用
  • Wait:會等待那裡(阻塞),直到有足夠的 Token 或該 context 被取消。
如果需要一次消耗多個 Token,則使用 AllowN, ReserveNWaitN

Wait/WaitN

func (lim *Limiter) Wait(ctx context.Context) (err error)  // 等同於 WaitN(ctx, 1)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
  • WaitN 會阻塞住,每次執行需要消耗 n 個 token,也就是直到有足夠(n)的 token 時才會繼續往後執行
  • 在下述情況發生時會回傳錯誤
    • 如果需要消耗的 token 數目( n) 超過 Limiter 水桶的數量(burst size)時
    • Context 被取消(canceled)
    • Context 的等待時間超過 Deadline 時
// 範例程式碼:https://www.jianshu.com/p/1ecb513f7632
func main() {
  counter := 0
  ctx := context.Background()

  // 每 200 毫秒會放一次 token 到桶子(每秒會放 5 個 token 到桶子),bucket 最多容納 1 個 token
  limit := rate.Every(time.Millisecond * 200)
  limiter := rate.NewLimiter(limit, 1)
  fmt.Println(limiter.Limit(), limiter.Burst()) // 5,1

  for {
    counter++
    limiter.Wait(ctx)
    fmt.Printf("counter: %v, %v \n", counter, time.Now().Format(time.RFC3339))
  }
}

Allow/AllowN

func (lim *Limiter) Allow() bool      // 等同於 AllowN(time.Now(), 1)
func (lim *Limiter) AllowN(now time.Time, n int) bool
  • AllowN 表示在某個的時間點時,每次需要消耗 n 個 token,若桶子中的 token 數目是否滿足 n,則會回傳 true 並消耗掉桶子中的 token,否則回傳 false
  • 只有在你想要 drop / skip 超過 rate limit 的事件時使用,否則使用 ReserveWait
// 範例程式碼:https://www.jianshu.com/p/1ecb513f7632
func main() {
  counter := 0

  // event rate:每 200 毫秒會放一次 token 到桶子(每秒會放 5 個 token 桶子)
  // burst size:bucket 最多容納 4 個 token
  limit := rate.Every(time.Millisecond * 200)
  limiter := rate.NewLimiter(limit, 4)
  fmt.Println(limiter.Limit(), limiter.Burst()) // 5,4

  for {
    counter++

    // 每次需要 3 個 token
    if isAllowed := limiter.AllowN(time.Now(), 3); isAllowed {
      fmt.Printf("counter: %v, %v \n", counter, time.Now().Format(time.RFC3339))
    } else {
      fmt.Printf("counter: %v, not allow \n", counter)
      time.Sleep(100 * time.Millisecond)
    }
  }
}

Reserve/ReserveN

func (lim *Limiter) Reserve() *Reservation   // 等同於 ReserveN(time.Now(), 1)
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
  • ReserveN 會回傳 Reservation,用來指稱還需要等多久才能有足夠的 token 讓事件發生;後續的 Limiter 會把 Reservation 納入考量
  • 當 n 超過桶子能夠容納的 token 數量時(即,Limiters 的 burst size),Reservation 的 OK 方法將會回傳 false
func main() {
  counter := 0

  // event rate:每 200 毫秒會放一次 token 到桶子(每秒會放 5 個 token 桶子)
  // burst size:bucket 最多容納 3 個 token
  limit := rate.Every(time.Millisecond * 200)
  limiter := rate.NewLimiter(limit, 3)
  fmt.Println(limiter.Limit(), limiter.Burst()) // 5,3

  for {
    counter++
    // 每次執行需要 2 個 token
    tokensNeed := 2
    reserve := limiter.ReserveN(time.Now(), tokensNeed)

    // r.OK() 是 false 表示 n 的數量大於桶子能容納的數量(lim.burst)
    if !reserve.OK() {
      fmt.Printf("一次所需的 token 數(%v)大於桶子能容納 token 的數(%v)\n", tokensNeed, limiter.Burst())
      return
    }

    // reserve.Delay() 可以取得需要等待的時間
    time.Sleep(reserve.Delay())

    // 等待完後做事...
    fmt.Printf("counter: %v, %v \n", counter, time.Now().Format(time.RFC3339))
  }
}
  • r.Delay():可以得到需要等待的時間,0 則表示不用等待

調整 Limiter

keywords: SetBurst, SetLimit, SetBurstAt, SetLimitAt
如果需要動態調整 Limiter 的數率和桶子的大小,則可以使用 SetBurstSetLimit 的方法。

整合 GIN 限制向 client 發送 Request 的次數

限制特定 usecase / API 中的 limiter

usecase (API)

  • PostUsecase 的 struct 中定義 Limiter 的型別
  • router/post.go 中使用 NewLimiter() 來建立 Limiter
  • GetPost 中透過 Limiter.Wait() 來限制發送請求的頻率
// usecase/post.go

// STEP 1:在 struct 中定義 limiter,並在 router/post.go 中建立 Limiter
type PostUsecase struct {
  Limiter *rate.Limiter
}

func (p *PostUsecase) GetPost(ctx *gin.Context) {
  id := ctx.Param("id")

  // STEP 3:使用 Limiter.Wait,每次會消耗桶子中的一個 token
  p.Limiter.Wait(ctx)

  // STEP 4:實際發送請求
  post := getPost(id)

  ctx.JSON(http.StatusOK, post)
}

router

  • 使用 NewLimiter() 來建立 Limiter
    • rate.Every(200 * time.Millisecond):每 200 毫秒會放入一個 token 到桶子(bucket)中
    • rate.NewLimiter(limit, 1):桶子的容量(burst size)為 1 個 token
// router/post.go

func registerPosts(router *gin.Engine) {

  // STEP 2:使用 NewLimiter() 來建立 Limiter
  limit := rate.Every(1000 * time.Millisecond)
  limiter := rate.NewLimiter(limit, 1)
  postHandler := &usecase.PostUsecase{
    Limiter: limiter,
  }

  router.GET("/posts/:id", postHandler.GetPost)
}

限制多支 usecase / API 的 limiter

撰寫 limiter package

如果是很多不同支 API 都需要限制流量的話,則可以建立一個獨立的 package:
// ./pkg/limiter/limiter.go
package limiter

// STEP 1:建立 limiter
// rate:每秒會放 1 個 token 到 bucket 中
// burst size:桶子最多可以容納 1 個 bucket
var RateLimiter = rate.Every(time.Millisecond * 1000)
var RequestLimiter = rate.NewLimiter(RateLimiter, 1)

在 API 中使用 limiter

並在需要限流的 limiter 中使用它:
// ./usecase/post.go
package usecase

import "sandbox/gin-sandbox/pkg/limiter"

func (p *PostUsecase) GetPost(ctx *gin.Context) {

  // STEP 3:使用建立好的 limiter
  // 每次需要消耗桶子中的 1 個 bucket
  limiter.RequestLimiter.Wait(ctx)

  post := getPost(id)

  ctx.JSON(http.StatusOK, post)
}
在另一支需要限流的 API 中使用寫好的 limiter:
// ./usecase/healthcheck.go

import "sandbox/gin-sandbox/pkg/limiter"

package usecase

func (h *HealthCheckUsecase) Pong(ctx *gin.Context) {

  limiter.RequestLimiter.Wait(ctx)

  ctx.JSON(http.StatusOK, gin.H{
    "message":    "pong",
    "threadNum,": threadNum,
    "counter":    counter,
  })
}

使用 JMeter 測試結果

若我們的 Limiter 限制每秒給一個 token 到 bucket 中,且 bucket 的 burst size(能夠容納的 token 數量)為 1 時,表示每秒只能處理一個請求。
若以 JMeter 進行測試,可以看到 Throughput(流量)的欄位即為 1.0/sec
Screen Shot 2020-11-16 at 4.31.57 PM

範例程式碼

參考