temporal
temporal copied to clipboard
Do not block shard ownership assertion if `acquireShards` is blocked
Do not block shard ownership assertion if acquireShards is blocked
v1.17.x logic:
shard controller will start a background thread periodically asserting shard ownership
for {
select {
...
case <-acquireTicker.C:
c.acquireShards()
...
}
}
if acquireShards is blocked due to whatever reason, then no further acquireShards will be executed.
OSS team should consider the following logic
var (
ShardController struct {
...
shardAssertionInProgressMutex sync.Mutex
shardAssertionInProgress map[int32]struct{}
shardIDChan chan int32
...
}
)
func (sc *ShardController) eventLoop() {
acquireTicker := time.NewTicker(c.config.AcquireShardInterval())
defer acquireTicker.Stop()
for {
select {
case <-c.shutdownCh:
return
case <-acquireTicker.C:
for i = 0; i < totalShardIDs; i++ {
shardAssertionInProgressMutex.Lock()
_, exist = sc.shardAssertionInProgress[i]
if !exist {
sc.shardAssertionInProgress[i] = struct{}{}
}
shardAssertionInProgressMutex.Unlock()
if !exist {
sc.shardIDChan <- i
}
}
}
}
}
func (sc *ShardController) acquireShard() {
for shardID := range shardIDChan {
...
// shard ownership assertion logic here
...
shardAssertionInProgressMutex.Lock()
delete(sc.shardAssertionInProgress, shardID)
shardAssertionInProgressMutex.Unlock()
}
}
I like that idea, I agree there's no reason to wait for one "wave" to finish before starting the next one. But also look at #3108, which will make acquireShards not block on individual shard rwlocks anymore, which might be enough so that this doesn't matter.