fix(stream): check if in restart procedure in call back.

This commit is contained in:
Haojun Liao 2024-01-05 22:16:58 +08:00
parent 2386f842fb
commit 522e688387
2 changed files with 18 additions and 12 deletions

View File

@ -774,22 +774,27 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
streamMetaWLock(pMeta);
int32_t vgId = pMeta->vgId;
streamMetaWLock(pMeta);
if (pStartInfo->taskStarting == 1) {
tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
pMeta->startInfo.restartCount);
} else { // not in starting procedure
if (pStartInfo->restartCount > 0) {
pStartInfo->restartCount -= 1;
ASSERT(pStartInfo->taskStarting == 0);
tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", pMeta->vgId, pMeta->role,
tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role,
pStartInfo->restartCount);
streamMetaWUnLock(pMeta);
restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
return TSDB_CODE_SUCCESS;
} else {
streamMetaWUnLock(pMeta);
tqDebug("vgId:%d start all tasks completed in callbackFn", pMeta->vgId);
}
}
streamMetaWUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}

View File

@ -157,6 +157,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
*numOfBlocks = 0;
*blockSize = 0;
// todo remove it
// no available token in bucket for sink task, let's wait for a little bit
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) {
stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);