Merge branch 'fix/3_liaohj' of https://github.com/taosdata/TDengine into fix/3_liaohj

This commit is contained in:
Yihao Deng 2024-01-15 08:27:55 +00:00
commit edbbd2282c
3 changed files with 30 additions and 4 deletions

View File

@ -885,6 +885,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
// checkpoint // checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -833,23 +833,31 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
STaskStartInfo* pStartInfo = &pMeta->startInfo; STaskStartInfo* pStartInfo = &pMeta->startInfo;
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
if (pStartInfo->taskStarting == 1) { if (pStartInfo->taskStarting == 1) {
tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId, tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
pMeta->startInfo.restartCount); pMeta->startInfo.restartCount);
} else { // not in starting procedure } else { // not in starting procedure
if (pStartInfo->restartCount > 0) { bool allReady = streamMetaAllTasksReady(pMeta);
if ((pStartInfo->restartCount > 0) && (!allReady)) {
// if all tasks are ready now, do NOT restart again, and reset the value of pStartInfo->restartCount
pStartInfo->restartCount -= 1; pStartInfo->restartCount -= 1;
tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role, tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role,
pStartInfo->restartCount); pStartInfo->restartCount);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER)); restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
tqDebug("vgId:%d start all tasks completed in callbackFn", pMeta->vgId); if (pStartInfo->restartCount == 0) {
tqDebug("vgId:%d start all tasks completed in callbackFn, restartCount is 0", pMeta->vgId);
} else if (allReady) {
pStartInfo->restartCount = 0;
tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
}
} }
} }

View File

@ -1518,6 +1518,23 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
return 0; return 0;
} }
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
int32_t num = taosArrayGetSize(pMeta->pTaskList);
for(int32_t i = 0; i < num; ++i) {
STaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pTaskId, sizeof(*pTaskId));
if (ppTask == NULL) {
continue;
}
if ((*ppTask)->status.downstreamReady == 0) {
return false;
}
}
return true;
}
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
stInfo("vgId:%d start to task:0x%x by checking downstream status", vgId, taskId); stInfo("vgId:%d start to task:0x%x by checking downstream status", vgId, taskId);