From 848b1e19b5c740069ceb057a637d08aa39b8eb79 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 14:39:46 +0800 Subject: [PATCH 1/9] fix(stream): fix some typo. --- source/dnode/vnode/src/tq/tqUtil.c | 5 +++-- source/libs/stream/src/streamMeta.c | 12 +++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 33e3414a7d..642de6c3f8 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -605,12 +605,13 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b numOfTasks = taosArrayGetSize(pMeta->pTaskList); for (int32_t i = 0; i < numOfTasks; ++i) { - STaskId* pId = taosArrayGet(pMeta->pTaskList, i); + SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); if (pId->streamId != streamId) { continue; } - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask == NULL) { tqError("vgId:%d failed to acquire task:0x%" PRIx64 " in retrieving progress", pMeta->vgId, pId->taskId); continue; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d0f9d40469..3c50156fad 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1007,9 +1007,10 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); for (int32_t i = 0; i < numOfTasks; ++i) { - STaskId* pId = taosArrayGet(pMeta->pTaskList, i); + SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (pTask == NULL) { continue; } @@ -1020,7 +1021,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { } STaskStatusEntry entry = { - .id = *pId, + .id = id, .status = streamTaskGetStatus(*pTask)->state, .nodeId = hbMsg.vgId, .stage = pMeta->stage, @@ -1508,8 +1509,9 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { int32_t num = taosArrayGetSize(pMeta->pTaskList); for (int32_t i = 0; i < num; ++i) { - STaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pTaskId, sizeof(*pTaskId)); + SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask == NULL) { continue; } From 3aa6e00bee01b22d3689b1e65db67f731645bd8d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 15:58:59 +0800 Subject: [PATCH 2/9] fix(stream): fix some typo. --- source/dnode/vnode/src/tq/tqUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 642de6c3f8..5290c39d42 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -613,7 +613,7 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask == NULL) { - tqError("vgId:%d failed to acquire task:0x%" PRIx64 " in retrieving progress", pMeta->vgId, pId->taskId); + tqError("vgId:%d failed to acquire task:0x%x in retrieving progress", pMeta->vgId, pId->taskId); continue; } From bdbcdf34655b9f56e1ce5db787e3f283cae5cd99 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 16:58:58 +0800 Subject: [PATCH 3/9] refactor: add two assert. --- source/libs/stream/src/streamMeta.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 3c50156fad..8433531f71 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -744,10 +744,16 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t pTask = *ppTask; // it is an fill-history task, remove the related stream task's id that points to it - atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); + if (pTask->info.fillHistory == 0) { + atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); + } + + ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList)); taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); + + ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList)); streamMetaRemoveTask(pMeta, &id); streamMetaWUnLock(pMeta); From 0f4ffdf49a96ab594f7fce16189a89dbf2aae474 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 18:57:31 +0800 Subject: [PATCH 4/9] fix(stream): add some info. --- source/libs/stream/src/streamMeta.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8433531f71..bca0f06d5d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -673,13 +673,21 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) } static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamTaskId* id) { + bool remove = false; for (int32_t i = 0; i < num; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) { taosArrayRemove(pMeta->pTaskList, i); + stDebug("vgId:%d remove streamId:0x%" PRIx64 " taskId:0x%x", pMeta->vgId, id->streamId, id->taskId); + remove = true; break; + } else { + stDebug("vgId:%d remove streamId:0x%" PRIx64 " taskId:0x%x, entry:0x%" PRIx64 "-0x%x", pMeta->vgId, id->streamId, + id->taskId, pTaskId->streamId, pTaskId->taskId); } } + + ASSERT(remove); } static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { From 60fac803459c81fdce2635c8bcd0eec08e14cdad Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 19:34:09 +0800 Subject: [PATCH 5/9] fix(stream): add some logs. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- source/libs/stream/src/streamMeta.c | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c27249cff6..d9bc8b74d2 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -635,8 +635,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen // drop the related fill-history task firstly if (hTaskId.taskId != 0 && hTaskId.streamId != 0) { - streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId); + streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); } // drop the stream task now diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index bca0f06d5d..df22dff97f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -677,8 +677,8 @@ static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamTaskId* i for (int32_t i = 0; i < num; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) { + stDebug("vgId:%d remove streamId:0x%" PRIx64 " taskId:0x%x succ", pMeta->vgId, id->streamId, id->taskId); taosArrayRemove(pMeta->pTaskList, i); - stDebug("vgId:%d remove streamId:0x%" PRIx64 " taskId:0x%x", pMeta->vgId, id->streamId, id->taskId); remove = true; break; } else { @@ -723,7 +723,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t } streamMetaWUnLock(pMeta); - stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId); + stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, pMeta->vgId); while (1) { streamMetaRLock(pMeta); @@ -750,6 +750,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; + ASSERT(pTask->id.taskId == id.taskId && pTask->id.streamId == id.streamId); // it is an fill-history task, remove the related stream task's id that points to it if (pTask->info.fillHistory == 0) { From f57ff9640884e53834e993d90229d00f70c40922 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 19:56:18 +0800 Subject: [PATCH 6/9] fix(stream): do some refactor. --- source/libs/stream/src/streamMeta.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index df22dff97f..05c414078d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -750,7 +750,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; - ASSERT(pTask->id.taskId == id.taskId && pTask->id.streamId == id.streamId); + SStreamTaskId pxId = pTask->id; + ASSERT((pxId.taskId == id.taskId) && (pxId.streamId == id.streamId)); // it is an fill-history task, remove the related stream task's id that points to it if (pTask->info.fillHistory == 0) { From c4a27569076a7d5ba418e3c4acec88a013105205 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 23:13:54 +0800 Subject: [PATCH 7/9] fix(stream): not revise the stream id for fill-history task. --- include/libs/stream/streamState.h | 3 ++- source/dnode/snode/src/snode.c | 6 ----- source/dnode/vnode/src/sma/smaRollup.c | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 31 ++++++---------------- source/libs/stream/src/streamMeta.c | 24 +++++------------ source/libs/stream/src/streamState.c | 7 ++--- 6 files changed, 21 insertions(+), 52 deletions(-) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index ae5a733ae9..1333257dfb 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -29,7 +29,8 @@ extern "C" { #include "storageapi.h" -SStreamState* streamStateOpen(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages); +SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId, bool specPath, + int32_t szPage, int32_t pages); void streamStateClose(SStreamState* pState, bool remove); int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index d322eb2977..481033508b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -25,12 +25,6 @@ #define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0) // clang-format on -static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) { - ASSERT(pTask->info.fillHistory); - pTask->id.taskId = pId->taskId; - pTask->id.streamId = pId->streamId; -} - int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0); int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 3cc7c6ec66..ac5463e492 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -299,7 +299,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id); pStreamTask->status.pSM = streamCreateStateMachine(pStreamTask); pStreamTask->chkInfo.pActiveInfo = streamTaskCreateActiveChkptInfo(); - pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1); + pStreamState = streamStateOpen(taskInfDir, pStreamTask, pStreamTask->id.streamId, pStreamTask->id.taskId, true, -1, -1); if (!pStreamState) { terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; return TSDB_CODE_FAILED; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index d9bc8b74d2..963503c135 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -30,37 +30,26 @@ typedef struct SMStreamCheckpointReadyRspMsg { static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); -static STaskId replaceStreamTaskId(SStreamTask* pTask) { - ASSERT(pTask->info.fillHistory); - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - - pTask->id.streamId = pTask->streamTaskId.streamId; - pTask->id.taskId = pTask->streamTaskId.taskId; - - return id; -} - -static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) { - ASSERT(pTask->info.fillHistory); - pTask->id.taskId = pId->taskId; - pTask->id.streamId = pId->streamId; -} - int32_t tqExpandStreamTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; int32_t vgId = pMeta->vgId; - STaskId taskId = {0}; int64_t st = taosGetTimestampMs(); + int64_t streamId = 0; + int32_t taskId = 0; tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId); if (pTask->info.fillHistory) { - taskId = replaceStreamTaskId(pTask); + streamId = pTask->streamTaskId.streamId; + taskId = pTask->streamTaskId.taskId; + } else { + streamId = pTask->id.streamId; + taskId = pTask->id.taskId; } // sink task does not need the pState if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - pTask->pState = streamStateOpen(pMeta->path, pTask, false, -1, -1); + pTask->pState = streamStateOpen(pMeta->path, pTask, false, streamId, taskId, -1, -1); if (pTask->pState == NULL) { tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId); return -1; @@ -69,10 +58,6 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { } } - if (pTask->info.fillHistory) { - restoreStreamTaskId(pTask, &taskId); - } - SReadHandle handle = { .checkpointId = pTask->chkInfo.checkpointId, .pStateBackend = pTask->pState, diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 05c414078d..e8800c3370 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -608,6 +608,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } taosArrayPush(pMeta->pTaskList, &pTask->id); + taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); if (streamMetaSaveTask(pMeta, pTask) < 0) { return -1; @@ -617,7 +618,6 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa return -1; } - taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); if (pTask->info.fillHistory == 0) { atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); } @@ -672,21 +672,16 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) } } -static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamTaskId* id) { +static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id) { bool remove = false; for (int32_t i = 0; i < num; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) { - stDebug("vgId:%d remove streamId:0x%" PRIx64 " taskId:0x%x succ", pMeta->vgId, id->streamId, id->taskId); - taosArrayRemove(pMeta->pTaskList, i); + taosArrayRemove(pTaskList, i); remove = true; break; - } else { - stDebug("vgId:%d remove streamId:0x%" PRIx64 " taskId:0x%x, entry:0x%" PRIx64 "-0x%x", pMeta->vgId, id->streamId, - id->taskId, pTaskId->streamId, pTaskId->taskId); } } - ASSERT(remove); } @@ -750,26 +745,19 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; - SStreamTaskId pxId = pTask->id; - ASSERT((pxId.taskId == id.taskId) && (pxId.streamId == id.streamId)); - // it is an fill-history task, remove the related stream task's id that points to it if (pTask->info.fillHistory == 0) { atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); } - ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList)); - taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); - doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); - - ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList)); + doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); streamMetaRemoveTask(pMeta, &id); + ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList)); streamMetaWUnLock(pMeta); ASSERT(pTask->status.timerActive == 0); - if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt); taosTmrStop(pTask->schedInfo.pDelayTimer); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 47324bd8c9..8919bd48ac 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -98,7 +98,8 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { return winKeyCmprImpl(&pWin1->key, &pWin2->key); } -SStreamState* streamStateOpen(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) { +SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId, bool specPath, + int32_t szPage, int32_t pages) { SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); stDebug("open stream state %p, %s", pState, path); if (pState == NULL) { @@ -114,8 +115,8 @@ SStreamState* streamStateOpen(const char* path, void* pTask, bool specPath, int3 } SStreamTask* pStreamTask = pTask; - pState->taskId = pStreamTask->id.taskId; - pState->streamId = pStreamTask->id.streamId; + pState->streamId = streamId; + pState->taskId = taskId; sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-0x%x", pState->streamId, pState->taskId); streamTaskSetDb(pStreamTask->pMeta, pTask, pState->pTdbState->idstr); From 985d6195699c581f401f990ae980c088b8df6140 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 23:14:33 +0800 Subject: [PATCH 8/9] fix(stream): fix syntax error. --- include/libs/executor/storageapi.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 330ba31c65..8c67f77adb 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -410,7 +410,7 @@ typedef struct SStateStore { void (*streamFileStateClear)(struct SStreamFileState* pFileState); bool (*needClearDiskBuff)(struct SStreamFileState* pFileState); - SStreamState* (*streamStateOpen)(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages); + SStreamState* (*streamStateOpen)(const char* path, void* pTask, int64_t streamId, int32_t taskId, bool specPath, int32_t szPage, int32_t pages); void (*streamStateClose)(SStreamState* pState, bool remove); int32_t (*streamStateBegin)(SStreamState* pState); int32_t (*streamStateCommit)(SStreamState* pState); From bca3cf4e9ee6e698ba8bd99efd15a52daef8882c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 23:51:25 +0800 Subject: [PATCH 9/9] fix(stream): fix error in unit test. --- source/libs/stream/test/backendTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index dc506cfbc9..e6a508f6f7 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -46,7 +46,7 @@ SStreamState *stateCreate(const char *path) { SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL); pTask->pMeta = pMeta; - SStreamState *p = streamStateOpen((char *)path, pTask, true, 32, 32 * 1024); + SStreamState *p = streamStateOpen((char *)path, pTask, 0, 0, true, 32, 32 * 1024); ASSERT(p != NULL); return p; }