fix(stream): add task number check.

This commit is contained in:
Haojun Liao 2024-01-07 01:32:50 +08:00
parent 5e32aea1f9
commit 29219ba9f4
4 changed files with 16 additions and 11 deletions

View File

@ -32,7 +32,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
bool isLeader, bool restored); bool isLeader, bool restored);
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta); int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks);
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);

View File

@ -145,7 +145,8 @@ FAIL:
} }
int32_t sndInit(SSnode *pSnode) { int32_t sndInit(SSnode *pSnode) {
tqStreamTaskResetStatus(pSnode->pMeta); int32_t numOfTasks = 0;
tqStreamTaskResetStatus(pSnode->pMeta, &numOfTasks);
streamMetaStartAllTasks(pSnode->pMeta); streamMetaStartAllTasks(pSnode->pMeta);
return 0; return 0;
} }

View File

@ -706,16 +706,16 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
return 0; return 0;
} }
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta) { int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); *numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks); tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, *numOfTasks);
if (numOfTasks == 0) { if (*numOfTasks == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < (*numOfTasks); ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
@ -767,8 +767,10 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
} }
if (isLeader && !tsDisableStream) { if (isLeader && !tsDisableStream) {
tqStreamTaskResetStatus(pMeta); int32_t numOfTasks = 0;
tqStreamTaskResetStatus(pMeta, &numOfTasks);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
streamMetaStartAllTasks(pMeta); streamMetaStartAllTasks(pMeta);
} else { } else {
streamMetaResetStartInfo(&pMeta->startInfo); streamMetaResetStartInfo(&pMeta->startInfo);

View File

@ -570,10 +570,12 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
} else { } else {
vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId);
tqStreamTaskResetStatus(pVnode->pTq->pStreamMeta);
{ int32_t numOfTasks = 0;
tqStreamTaskResetStatus(pVnode->pTq->pStreamMeta, &numOfTasks);
if (numOfTasks > 0) {
if (pMeta->startInfo.taskStarting == 1) { if (pMeta->startInfo.taskStarting == 1) {
pMeta->startInfo.restartCount += 1; pMeta->startInfo.restartCount += 1;
tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,