From 95469124f8ac2b9fac2c801b74798dccc936bf03 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 3 Jul 2024 06:47:52 +0000 Subject: [PATCH] fix stream restart crash --- source/libs/stream/src/streamMeta.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 958c3bc00f..2244861bc7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -273,7 +273,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) pBackend->pTask = pTask; pBackend->pMeta = pMeta; - pTask->chkInfo.processedVer = processVer; + if (processVer != -1) pTask->chkInfo.processedVer = processVer; taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*)); taosThreadMutexUnlock(&pMeta->backendMutex); @@ -905,7 +905,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (p == NULL) { code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1); if (code < 0) { - stError("failed to load s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno)); + stError("failed to load s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno)); tFreeStreamTask(pTask); continue; } @@ -990,7 +990,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount); stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", - vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount); + vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount); // wait for the stream meta hb function stopping streamMetaWaitForHbTmrQuit(pMeta); @@ -1175,7 +1175,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { int64_t now = taosGetTimestampMs(); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%"PRId64, vgId, numOfTasks, now); + stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now); if (numOfTasks == 0) { stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId);