接續上一篇文章:GraphQL Dataloader 在 Golang 當中的打開方法與原理解析(上) 。
在進入原始碼之前,先複習一下 use case。首先從 resolver 開始看起:
1
2
3
4
5
// IconURL is the resolver for the icon_url field.
func ( r * roomFeatureResolver ) IconURL ( ctx context . Context , obj * model . RoomFeature ) ( * string , error ) {
f , _ := storage . GetFeature ( ctx , * obj . ID )
return f . IconUrl , nil
}
當中呼叫了我們的自定義方法 storage.GetFeature
:
1
2
3
4
5
6
7
func GetFeature ( ctx context . Context , featureID int ) ( * model . RoomFeatureWithData , error ) {
//...
thunk := loaders . FeatureLoader . Load ( ctx , DataLoader . StringKey ( fmt . Sprintf ( "%d" , featureID )))
result , err := thunk ()
//...
return result .( * model . RoomFeatureWithData ), nil
}
重點就是這裡使用到的 loaders.FeatureLoader.Load
方法。
FeatureLoader
雖為自定義屬性,但是透過 DataLoader 套件提供的 *DataLoader.Loader
建構子 DataLoader.NewBatchedLoader
所創建出,所以本身就具備了 Load 方法。
在深入探究 Load
方法之前,先釐清一下這個 GetFeature
方法什麼時候會被執行到。回到一開始的案例,
當 Client 進行如下 Query 時:
1
2
3
4
5
6
7
8
9
10
query {
content ( hotel_id : 1 ) {
rooms {
features {
id
icon_url
}
}
}
}
如果有 n 個 rooms,每個 room 有 m 個 features,總共會執行 n * m 次 IconURL
來取得每個 feature 的 icon URL,也就是執行 m * n 次 GetFeature
。而在每個 GetFeature
方法當中又呼叫了 loaders.FeatureLoader.Load
方法,透過回傳的函數取得 feature 資料。
單純以使用上的角度,可以把 Load
方法當成是一個黑盒子,當我們向他索取需要的資料時,他就會回傳給我們。
想像如果這些方法都是同步被執行的,假設第一次傳入 GetFeature
的 feature id 為 1,此時要向 DataLoader 取 id 為 1 的 feature 資料,照理來說應該還沒有辦法取得,因為這時候 DataLoader 尚未蒐集到所有 rooms 的 features,(要搜集完所有 id 才會向資料庫查詢),所以應該會暫時 block 住,等待所有 id 蒐集完後才取得所有資料,才能取得 id 1 的資料。可想而知,這個功能背後一定是有非同步 的支持才有可能辦到。
所以到底 DataLoader 是在什麼時候蒐集到所有 rooms 的 features 每個 id 的?又是在什麼時候呼叫向資料庫查詢的方法?就來看看這個 Load
方法到底做了什麼吧!
然後因為這個方法有點長,所以我會按照順序分段貼上來,但不會全部貼,比較非核心功能的部分也會暫時省略。有興趣的讀者可以自行去看原始碼。
首先創建一個接收結果的 channel,以及一個 result 變數,接著試圖到 Loader
的 cache 當中尋找 key 對應的值存不存在,有的話就直接返回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//github.com/graph-gophers/DataLoader@v5.0.0+incompatible/DataLoader.go
func ( l * Loader ) Load ( originalContext context . Context , key Key ) Thunk {
ctx , finish := l . tracer . TraceLoad ( originalContext , key )
c := make ( chan * Result , 1 )
var result struct {
mu sync . RWMutex
value * Result
}
l . cacheLock . Lock ()
if v , ok := l . cache . Get ( ctx , key ); ok {
defer finish ( v )
defer l . cacheLock . Unlock ()
return v
}
//...下面續
}
返回值是 Thunk
類型,其實就是一個這樣形狀的 function:
1
type Thunk func () ( interface {}, error )
接著,當在 cache 當中找不到對應值時,就新建一個 thunk 函數,當中會使用上面建立的 result 變數,如果 result 裡的值為空,就一直等待直到有資料傳進 channel, 然後將 result 的值更新為傳入的 value。最後寫入 cache。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func ( l * Loader ) Load ( originalContext context . Context , key Key ) Thunk {
// ...接續上面
thunk := func () ( interface {}, error ) {
result . mu . RLock () //上讀鎖,避免在讀取資料時,同時有其他線程想更新該資料
resultNotSet := result . value == nil
result . mu . RUnlock () // 釋放讀鎖
if resultNotSet {
result . mu . Lock () //上獨占鎖,避免在更新資料時,同時有其他線程想更新該資料
if v , ok := <- c ; ok { // block 住直到 c 有 result 傳入
result . value = v
}
result . mu . Unlock () //釋放獨佔鎖
}
result . mu . RLock ()
defer result . mu . RUnlock ()
return result . value . Data , result . value . Error
}
defer finish ( thunk )
l . cache . Set ( ctx , key , thunk )
l . cacheLock . Unlock ()
//...下面續
}
再來,新增一個 batchRequest,並檢查 loader 當中的 curBatcher 存在與否,不存在就創建新的。這個 curBatcher 內部會有執行自定義方法——也就是在 NewBatchedLoader 時傳入的 featureReader.GetFeature
方法——的邏輯。
接著另開一個 goroutine 執行 curBatcher 的 batch
方法之後,又開了一個 goroutine 執行 loader 的 sleeper 方法。
然後把 batchRequest 傳入到了 curBatcher 的 input channel,最後回傳 thunk 函數:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type batchRequest struct {
key Key
channel chan * Result
}
type batcher struct {
input chan * batchRequest
batchFn BatchFunc
//...
}
func ( l * Loader ) Load ( originalContext context . Context , key Key ) Thunk {
//...接續上面
// req 包含了要查詢的 key 和 等待接收結果的 channel
req := & batchRequest { key , c }
l . batchLock . Lock ()
if l . curBatcher == nil {
l . curBatcher = l . newBatcher ( l . silent , l . tracer )
go l . curBatcher . batch ( originalContext )
l . endSleeper = make ( chan bool )
go l . sleeper ( l . curBatcher , l . endSleeper )
}
l . curBatcher . input <- req
// 如果有額外設定 batchCap,可以限制一次最多可以接受的 req 數量
if l . batchCap > 0 { // 但我們並沒有設定,所以 batchCap 為 0,不會進入這邊
//...
}
l . batchLock . Unlock ()
return thunk
}
這時,發現邏輯進到了 curBatcher.batch
方法當中,所以緊接著來看這段方法:
逐一接收 input 的資料之後,終於看到了執行 b.batchFn(ctx, keys)
也就是我們自定義的資料庫查詢方法!
然後就是把查詢到的資料,依序放入每個 request 的 channel,再關閉 channel,如此一來 thunk 函數當中等待該 channel 傳回 result 那一行就會取得值而繼續往下執行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func ( b * batcher ) batch ( originalContext context . Context ) {
var (
keys = make ( Keys , 0 )
reqs = make ([] * batchRequest , 0 )
items = make ([] * Result , 0 )
panicErr interface {}
)
// keys 的順序 等同 reqs 的順序
for item := range b . input {
keys = append ( keys , item . key )
reqs = append ( reqs , item )
}
ctx , finish := b . tracer . TraceBatch ( originalContext , keys )
defer finish ( items )
func () {
// ...
items = b . batchFn ( ctx , keys ) // 執行自定義的資料庫查詢方法
}()
// ...
// 因為自定義方法當中也有依照 keys 的順序回傳 result,且 keys 的順序 等同 reqs 的順序
// 所以這樣放入順序不會亂
for i , req := range reqs {
req . channel <- items [ i ]
close ( req . channel )
}
}
但是別忘了,b.input
也是一個 channel,這邊又使用了 for..range 語法來遍歷 channel,照理來說,除非別的 goroutine 當中有把這個 channel 給關了,否則應該會造成 deadlock 才對。而在哪裡會將 input 這個 channel 關閉呢?
總共有兩個地方都在 Load 方法當中,一個是當 l.batchCap > 0
時,如果符合指定條件(執行的 req 數量等於 batchCap 數量時),會呼叫
l.curBatcher.end()
關閉 input chan。
1
2
3
4
5
6
7
8
if l . batchCap > 0 {
//...
if l . count == l . batchCap {
l . curBatcher . end () // 直接終止接收 req,不再繼續等待
close ( l . endSleeper ) // 關閉等待特定一段時間的 channel
l . reset ()
}
}
另一個是位於 go l.sleeper(l.curBatcher, l.endSleeper)
這一行,所以還得再看一下這個方法做了什麼才行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func ( l * Loader ) sleeper ( b * batcher , close chan bool ) {
select {
// 只有在 Load 方法中進入 `l.batchCap > 0` 的 if 區塊時,會對 close channel 傳送訊號
// 這邊收到訊號表示已經提前關閉了 input channel,也不需繼續等待下去
case <- close :
return
case <- time . After ( l . wait ):
}
l . batchLock . Lock ()
b . end () // <-----here
//...
}
func ( b * batcher ) end () {
if ! b . finished {
close ( b . input )
b . finished = true
}
}
可以看到,當等待時間一到(time.After(l.wait)
收到訊號後),就會往下執行到 b.end()
,也就是關閉 input channel 的方法,而等待的時間 l.wait
預設值是 16ms:
1
2
3
4
5
6
7
8
func NewBatchedLoader ( batchFn BatchFunc , opts ... Option ) * Loader {
loader := & Loader {
batchFn : batchFn ,
inputCap : 1000 ,
wait : 16 * time . Millisecond , // <-----here
}
//...
}
也就是說,如果 loader 的 input channel 在 16ms 之後都沒有再收到 request 的話,就會自動關閉了。
總結看完以上的原始碼之後,算是揭開了 DataLoader 的黑盒子,理解了它的運作原理。最後再重點整理一下 Dataloader 運作的流程:
當第一次調用 loader 的 Load
方法時,loader 內部會開啟一個 input channel 來接收查詢資料的 request,並開始等待 request 傳入。 每次調用 Load
方法時,都會建立一個 request(裏面包含 key 和一個接收查詢結果的 channel)並將其傳入 input channel。 Load
方法返回一個匿名函數,函數內部如果沒有可以立即返回的結果,會試圖從 request channel 中取值,因此在尚未有值送入 channel 之前會暫時阻塞。loader 的 input channel 會等待一段時間(預設為 16ms),時間一到就關閉接收 request。(如果有設定接收 request 的最大數量,也有可能提前關閉) input channel 關閉後,就會將所有搜集到的 request keys 傳給自訂批次處理方法,取得所有資料(為 key - value 形式)。 資料載入完成後,將這些資料依照 key 推進對應的 request channel,接收到該 channel 回傳值的匿名函數就會停止阻塞,將結果緩存後,回傳給 client。