fix(stream): reset checkpoint info after receiving task-reset info.

This commit is contained in:
Haojun Liao 2024-04-17 17:57:12 +08:00
parent 34b718bf4d
commit 289ec6cf66
1 changed files with 9 additions and 2 deletions

View File

@ -923,13 +923,20 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
taosThreadMutexLock(&pTask->lock);
streamTaskClearCheckInfo(pTask, true);
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) {
SStreamTaskState *pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) {
tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d",
pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId);
streamTaskClearCheckInfo(pTask, true);
streamTaskSetStatusReady(pTask);
} else if (pState->state == TASK_STATUS__UNINIT) {
tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
ASSERT(pTask->status.downstreamReady == 0);
/*int32_t ret = */ streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
} else {
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name);
}
taosThreadMutexUnlock(&pTask->lock);