refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-04-18 17:01:09 +08:00
parent 9f16802451
commit cc67b631f6
1 changed files with 7 additions and 3 deletions

View File

@ -29,18 +29,21 @@ int tqStreamTasksScanWal(STQ* pTq) {
while (1) {
int32_t scan = pMeta->walScan;
tqDebug("vgId:%d continue check if data in wal are available, scan:%d", vgId, scan);
ASSERT(scan >= 1);
// check all restore tasks
bool allFull = true;
streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &allFull);
bool shouldIdle = true;
streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &shouldIdle);
int32_t times = 0;
if (allFull) {
if (shouldIdle) {
taosWLockLatch(&pMeta->lock);
pMeta->walScan -= 1;
times = pMeta->walScan;
ASSERT(pMeta->walScan >= 0);
if (pMeta->walScan <= 0) {
taosWUnLockLatch(&pMeta->lock);
break;
@ -49,6 +52,7 @@ int tqStreamTasksScanWal(STQ* pTq) {
taosWUnLockLatch(&pMeta->lock);
tqDebug("vgId:%d scan wal for stream tasks for %d times", vgId, times);
} else {
tqDebug("vgId:%d no idle, scan wal for stream tasks for %d times", vgId, pMeta->walScan);
ASSERT(pMeta->walScan >= 1);
}
}