refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-01-03 23:57:33 +08:00
parent 74df121cf5
commit 1e3aad8883
1 changed files with 12 additions and 14 deletions

View File

@ -32,26 +32,24 @@ int32_t tqScanWal(STQ* pTq) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
while (1) { while (1) {
int32_t scan = pMeta->walScanCounter; tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, pMeta->walScanCounter);
tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, scan);
// check all tasks // check all tasks
bool shouldIdle = true; bool shouldIdle = true;
doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle); doScanWalForAllTasks(pMeta, &shouldIdle);
// if (shouldIdle) {
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
int32_t times = (--pMeta->walScanCounter); int32_t times = (--pMeta->walScanCounter);
ASSERT(pMeta->walScanCounter >= 0); ASSERT(pMeta->walScanCounter >= 0);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
if (times <= 0) { if (times > 0) {
break;
} else {
tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION); tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
} else { // times <= 0
break;
} }
// }
// todo: remove the sleep
taosMsleep(SCAN_WAL_IDLE_DURATION); taosMsleep(SCAN_WAL_IDLE_DURATION);
} }