diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index a6715dbcff..adb587fdf8 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -262,9 +262,13 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) { int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { taosWLockLatch(&pMeta->lock); + int64_t keys[2]; for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { - uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); - SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, pTaskId, sizeof(*pTaskId)); + SStreamId* pId = taosArrayGet(pMeta->pTaskList, i); + keys[0] = pId->streamId; + keys[1] = pId->taskId; + + SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); int8_t prev = p->status.taskStatus; ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7a77b98db6..7efc11399d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -485,6 +485,12 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) { pTask->status.stage += 1; streamSetStatusNormal(pTask); + + taosWLockLatch(&pTask->pMeta->lock); + streamMetaSaveTask(pTask->pMeta, pTask); + streamMetaCommit(pTask->pMeta); + taosWUnLockLatch(&pTask->pMeta->lock); + qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id, pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus));