From 4afe603c6e4df0b43e55b15e6046e2b3c6bb0a29 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 11 Aug 2023 14:17:09 +0800 Subject: [PATCH] fix(stream): fix error. --- source/libs/stream/src/streamCheckpoint.c | 8 ++++++-- source/libs/stream/src/streamTask.c | 6 ++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index a6715dbcff..adb587fdf8 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -262,9 +262,13 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) { int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { taosWLockLatch(&pMeta->lock); + int64_t keys[2]; for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { - uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); - SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, pTaskId, sizeof(*pTaskId)); + SStreamId* pId = taosArrayGet(pMeta->pTaskList, i); + keys[0] = pId->streamId; + keys[1] = pId->taskId; + + SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); int8_t prev = p->status.taskStatus; ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7a77b98db6..7efc11399d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -485,6 +485,12 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) { pTask->status.stage += 1; streamSetStatusNormal(pTask); + + taosWLockLatch(&pTask->pMeta->lock); + streamMetaSaveTask(pTask->pMeta, pTask); + streamMetaCommit(pTask->pMeta); + taosWUnLockLatch(&pTask->pMeta->lock); + qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id, pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus));