fix(stream): reset the tasks status before start all tasks, and do some other internal refactor.

This commit is contained in:
Haojun Liao 2024-01-16 14:32:45 +08:00
parent 4c119c7626
commit 80793c94c2
7 changed files with 76 additions and 37 deletions

View File

@ -32,8 +32,9 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
bool isLeader, bool restored); bool isLeader, bool restored);
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks); int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta);
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta);
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);

View File

@ -874,6 +874,8 @@ void streamMetaStartHb(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready); int64_t endTs, bool ready);
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta);

View File

@ -145,8 +145,7 @@ FAIL:
} }
int32_t sndInit(SSnode *pSnode) { int32_t sndInit(SSnode *pSnode) {
int32_t numOfTasks = 0; streamMetaResetTaskStatus(pSnode->pMeta);
tqStreamTaskResetStatus(pSnode->pMeta, &numOfTasks);
streamMetaStartAllTasks(pSnode->pMeta); streamMetaStartAllTasks(pSnode->pMeta);
return 0; return 0;
} }

View File

@ -175,11 +175,11 @@ int32_t tqStopStreamTasksAsync(STQ* pTq) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) { if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to stop tasks, code:%s", vgId, terrstr()); tqError("vgId:%d failed to create msg to stop tasks async, code:%s", vgId, terrstr());
return -1; return -1;
} }
tqDebug("vgId:%d create msg to stop tasks", vgId); tqDebug("vgId:%d create msg to stop all tasks async", vgId);
pRunReq->head.vgId = vgId; pRunReq->head.vgId = vgId;
pRunReq->streamId = 0; pRunReq->streamId = 0;

View File

@ -103,8 +103,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL || *ppTask == NULL) { if (ppTask == NULL || *ppTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", vgId, tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped", vgId, req.taskId);
req.taskId);
rsp.code = TSDB_CODE_SUCCESS; rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
@ -124,6 +123,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", idstr, req.transId); tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", idstr, req.transId);
} }
// duplicate update epset msg received, discard this redundant message
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId}; STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
@ -135,7 +135,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
return rsp.code; return rsp.code;
} }
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamTaskResetStatus(pTask); streamTaskResetStatus(pTask);
@ -159,11 +158,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
streamMetaSaveTask(pMeta, *ppHTask); streamMetaSaveTask(pMeta, *ppHTask);
} }
#if 0
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
#endif
} else { } else {
tqDebug("s-task:%s vgId:%d not save since restore not finish", idstr, vgId); tqDebug("s-task:%s vgId:%d not save since restore not finish", idstr, vgId);
} }
@ -171,7 +165,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
tqDebug("s-task:%s vgId:%d start to stop task after save task", idstr, vgId); tqDebug("s-task:%s vgId:%d start to stop task after save task", idstr, vgId);
streamTaskStop(pTask); streamTaskStop(pTask);
// keep the already handled info // keep the already updated info
taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0); taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
if (ppHTask != NULL) { if (ppHTask != NULL) {
@ -200,6 +194,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
updateTasks, (numOfTasks - updateTasks)); updateTasks, (numOfTasks - updateTasks));
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
} else { } else {
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
if (!restored) { if (!restored) {
tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId); tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId);
pMeta->startInfo.tasksWillRestart = 0; pMeta->startInfo.tasksWillRestart = 0;
@ -686,16 +684,16 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
return 0; return 0;
} }
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks) { int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
*numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, *numOfTasks); tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks);
if (*numOfTasks == 0) { if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
for (int32_t i = 0; i < (*numOfTasks); ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
@ -754,8 +752,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
} }
if (isLeader && !tsDisableStream) { if (isLeader && !tsDisableStream) {
int32_t numOfTasks = 0; streamMetaResetTaskStatus(pMeta);
tqStreamTaskResetStatus(pMeta, &numOfTasks);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
streamMetaStartAllTasks(pMeta); streamMetaStartAllTasks(pMeta);
@ -965,8 +962,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
} else if (status == TASK_STATUS__UNINIT) { } else if (status == TASK_STATUS__UNINIT) {
// todo: fill-history task init ? // todo: fill-history task init ?
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
EStreamTaskEvent event = /*HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT; streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
streamTaskHandleEvent(pTask->status.pSM, event);
} }
} }
@ -990,4 +986,11 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
} }
return code; return code;
}
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) {
streamMetaRLock(pMeta);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
streamMetaRUnLock(pMeta);
return num;
} }

View File

@ -570,9 +570,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
} else { } else {
vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId); vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId);
int32_t numOfTasks = 0; int32_t numOfTasks = tqStreamTasksGetTotalNum(pMeta);
tqStreamTaskResetStatus(pMeta, &numOfTasks);
if (numOfTasks > 0) { if (numOfTasks > 0) {
if (pMeta->startInfo.taskStarting == 1) { if (pMeta->startInfo.taskStarting == 1) {
pMeta->startInfo.restartCount += 1; pMeta->startInfo.restartCount += 1;
@ -580,6 +578,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
pMeta->startInfo.restartCount); pMeta->startInfo.restartCount);
} else { } else {
pMeta->startInfo.taskStarting = 1; pMeta->startInfo.taskStarting = 1;
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false); tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false);
return; return;

View File

@ -1413,31 +1413,43 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
} }
} }
static SArray* prepareBeforeStartTasks(SStreamMeta* pMeta) {
streamMetaWLock(pMeta);
SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = taosGetTimestampMs();
streamMetaResetTaskStatus(pMeta);
streamMetaWUnLock(pMeta);
return pTaskList;
}
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stInfo("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); stInfo("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
if (numOfTasks == 0) { if (numOfTasks == 0) {
stInfo("vgId:%d start tasks completed", pMeta->vgId); stInfo("vgId:%d start tasks completed", pMeta->vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SArray* pTaskList = NULL;
streamMetaWLock(pMeta);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = taosGetTimestampMs();
streamMetaWUnLock(pMeta);
// broadcast the check downstream tasks msg
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
SArray* pTaskList = prepareBeforeStartTasks(pMeta);
numOfTasks = taosArrayGetSize(pTaskList);
// broadcast the check downstream tasks msg
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
// todo: may be we should find the related fill-history task and set it failed.
// todo: use hashTable instead
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) { if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
@ -1445,8 +1457,6 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
continue; continue;
} }
// todo: may be we should find the related fill-history task and set it failed.
// fill-history task can only be launched by related stream tasks. // fill-history task can only be launched by related stream tasks.
STaskExecStatisInfo* pInfo = &pTask->execInfo; STaskExecStatisInfo* pInfo = &pTask->execInfo;
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
@ -1493,10 +1503,13 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
int32_t num = taosArrayGetSize(pMeta->pTaskList); int32_t num = taosArrayGetSize(pMeta->pTaskList);
stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num); stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
if (num == 0) { if (num == 0) {
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num);
streamMetaRUnLock(pMeta); streamMetaRUnLock(pMeta);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int64_t st = taosGetTimestampMs();
// send hb msg to mnode before closing all tasks. // send hb msg to mnode before closing all tasks.
SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta); SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta);
int32_t numOfTasks = taosArrayGetSize(pTaskList); int32_t numOfTasks = taosArrayGetSize(pTaskList);
@ -1514,6 +1527,9 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
taosArrayDestroy(pTaskList); taosArrayDestroy(pTaskList);
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el);
streamMetaRUnLock(pMeta); streamMetaRUnLock(pMeta);
return 0; return 0;
} }
@ -1640,4 +1656,23 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
}
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stDebug("vgId:%d reset all %d stream task(s) status to be uninit", pMeta->vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
streamTaskResetStatus(*pTask);
}
return 0;
} }