From 81f96603b071d71c63ed8c0e57e7c5b2cf8c86e9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Dec 2023 09:33:51 +0800 Subject: [PATCH 1/6] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 3 +- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 16 ++--- source/libs/stream/src/streamMeta.c | 65 ++++--------------- 4 files changed, 20 insertions(+), 66 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8f3e100db6..3c475d0a03 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -840,11 +840,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int3 int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); -int32_t streamMetaReopen(SStreamMeta* pMeta); +void streamMetaClear(SStreamMeta* pMeta); void streamMetaInitBackend(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); -int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta); int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 2ab710176d..50f413bcc9 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -181,5 +181,5 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) } int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { - return streamMetaReloadAllTasks(pWriter->pTq->pStreamMeta); + return streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta); } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index b1d49bf31b..a761d15eff 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -712,9 +712,9 @@ int32_t resetStreamTaskStatus(SStreamMeta* pMeta) { } static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { - int32_t vgId = pMeta->vgId; - int32_t code = 0; - int64_t st = taosGetTimestampMs(); + int32_t vgId = pMeta->vgId; + int32_t code = 0; + int64_t st = taosGetTimestampMs(); while(1) { int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); @@ -736,17 +736,9 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { } streamMetaWLock(pMeta); - code = streamMetaReopen(pMeta); - if (code != TSDB_CODE_SUCCESS) { - tqError("vgId:%d failed to reopen stream meta", vgId); - streamMetaWUnLock(pMeta); - code = terrno; - return code; - } + streamMetaClear(pMeta); - streamMetaInitBackend(pMeta); int64_t el = taosGetTimestampMs() - st; - tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.); code = streamMetaLoadAllTasks(pMeta); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 807f120cb7..23cb6f5a35 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -31,7 +31,6 @@ int32_t streamMetaId = 0; int32_t taskDbWrapperId = 0; static void metaHbToMnode(void* param, void* tmrId); -static void streamMetaClear(SStreamMeta* pMeta); static int32_t streamMetaBegin(SStreamMeta* pMeta); static void streamMetaCloseImpl(void* arg); @@ -395,41 +394,6 @@ _err: return NULL; } -int32_t streamMetaReopen(SStreamMeta* pMeta) { - streamMetaClear(pMeta); - - // NOTE: role should not be changed during reopen meta - pMeta->streamBackendRid = -1; - pMeta->streamBackend = NULL; - - char* defaultPath = taosMemoryCalloc(1, strlen(pMeta->path) + 128); - sprintf(defaultPath, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); - taosRemoveDir(defaultPath); - - char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 128); - sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received"); - - int32_t code = taosStatFile(newPath, NULL, NULL, NULL); - if (code == 0) { - // directory exists - code = taosRenameFile(newPath, defaultPath); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); - stError("vgId:%d failed to rename file, from %s to %s, code:%s", pMeta->vgId, newPath, defaultPath, - tstrerror(terrno)); - - taosMemoryFree(defaultPath); - taosMemoryFree(newPath); - return -1; - } - } - - taosMemoryFree(defaultPath); - taosMemoryFree(newPath); - - return 0; -} - // todo refactor: the lock shoud be restricted in one function void streamMetaInitBackend(SStreamMeta* pMeta) { pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId); @@ -829,28 +793,27 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { taosArrayDestroy(pRecycleList); } -int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta) { - if (pMeta == NULL) return 0; - - return streamMetaLoadAllTasks(pMeta); -} int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; - int32_t vgId = pMeta->vgId; - - stInfo("vgId:%d load stream tasks from meta files", vgId); - - if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { - stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno)); - return -1; - } - void* pKey = NULL; int32_t kLen = 0; void* pVal = NULL; int32_t vLen = 0; SDecoder decoder; - SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId)); + + if (pMeta == NULL) { + return TSDB_CODE_SUCCESS; + } + + SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId)); + int32_t vgId = pMeta->vgId; + stInfo("vgId:%d load stream tasks from meta files", vgId); + + if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { + stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno)); + taosArrayDestroy(pRecycleList); + return -1; + } tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { From 8e042e34cb01651c4a68538564917d65fa01030b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Dec 2023 09:48:14 +0800 Subject: [PATCH 2/6] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 54 ++++++++++++++++------------------ 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ee76a27414..4834924fe0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -699,7 +699,23 @@ end: return ret; } -void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } +static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } + +static STaskId replaceStreamTaskId(SStreamTask* pTask) { + ASSERT(pTask->info.fillHistory); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + + pTask->id.streamId = pTask->streamTaskId.streamId; + pTask->id.taskId = pTask->streamTaskId.taskId; + + return id; +} + +static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) { + ASSERT(pTask->info.fillHistory); + pTask->streamTaskId.taskId = pId->taskId; + pTask->streamTaskId.streamId = pId->streamId; +} int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { int32_t vgId = TD_VID(pTq->pVnode); @@ -713,15 +729,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { streamTaskOpenAllUpstreamInput(pTask); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - SStreamTask* pStateTask = pTask; - - STaskId taskId = {.streamId = 0, .taskId = 0}; + STaskId taskId = {0}; if (pTask->info.fillHistory) { - taskId.streamId = pTask->id.streamId; - taskId.taskId = pTask->id.taskId; - - pTask->id.streamId = pTask->streamTaskId.streamId; - pTask->id.taskId = pTask->streamTaskId.taskId; + taskId = replaceStreamTaskId(pTask); } pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -731,9 +741,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { } else { tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); } + if (pTask->info.fillHistory) { - pTask->id.streamId = taskId.streamId; - pTask->id.taskId = taskId.taskId; + restoreStreamTaskId(pTask, &taskId); } SReadHandle handle = { @@ -754,15 +764,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { - SStreamTask* pSateTask = pTask; - // SStreamTask task = {0}; - - STaskId taskId = {.streamId = 0, .taskId = 0}; + STaskId taskId = {0}; if (pTask->info.fillHistory) { - taskId.streamId = pTask->id.streamId; - taskId.taskId = pTask->id.taskId; - pTask->id.streamId = pTask->streamTaskId.streamId; - pTask->id.taskId = pTask->streamTaskId.taskId; + taskId = replaceStreamTaskId(pTask); } pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -774,15 +778,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { } if (pTask->info.fillHistory) { - pTask->id.streamId = taskId.streamId; - pTask->id.taskId = taskId.taskId; + restoreStreamTaskId(pTask, &taskId); } - int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); SReadHandle handle = { .checkpointId = pTask->chkInfo.checkpointId, .vnode = NULL, - .numOfVgroups = numOfVgroups, + .numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory, .winRange = pTask->dataRange.window, @@ -828,12 +830,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId); } - // // reset the task status from unfinished transaction - // if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { - // tqWarn("s-task:%s reset task status to be normal, status kept in taskMeta: Paused", pTask->id.idStr); - // pTask->status.taskStatus = TASK_STATUS__READY; - // } - streamTaskResetUpstreamStageInfo(pTask); streamSetupScheduleTrigger(pTask); SCheckpointInfo* pChkInfo = &pTask->chkInfo; From 239b2d4f006c7d9bd81a08f7f1caef335a6b1eaf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Dec 2023 09:51:27 +0800 Subject: [PATCH 3/6] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3c475d0a03..f6737b4e27 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -509,11 +509,8 @@ typedef struct SStreamMeta { SArray* chkpSaved; SArray* chkpInUse; SRWLatch chkpDirLock; - - void* qHandle; - int32_t pauseTaskNum; - - void* bkdChkptMgt; + void* qHandle; + void* bkdChkptMgt; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); From f0d63a977a913c20f956921746f11d09c446b80f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Dec 2023 10:01:57 +0800 Subject: [PATCH 4/6] refactor: do some internal refactor. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index a761d15eff..a106fe148b 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -750,10 +750,10 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { } if (isLeader && !tsDisableStream) { - tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); resetStreamTaskStatus(pMeta); - streamMetaWUnLock(pMeta); + tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); + startStreamTasks(pMeta); } else { streamMetaResetStartInfo(&pMeta->startInfo); From 38d7ae3cd7206ead35e30699586ad39cf9a2e3c3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Dec 2023 10:36:15 +0800 Subject: [PATCH 5/6] fix(query): add one more row for table rows distributed --- source/libs/function/src/builtinsimpl.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 25a3c509a7..93f3b6c109 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -5466,13 +5466,12 @@ bool blockDistSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { } int32_t blockDistFunction(SqlFunctionCtx* pCtx) { - const int32_t BLOCK_DIST_RESULT_ROWS = 24; + const int32_t BLOCK_DIST_RESULT_ROWS = 25; SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - STableBlockDistInfo* pDistInfo = GET_ROWCELL_INTERBUF(pResInfo); + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + STableBlockDistInfo* pDistInfo = GET_ROWCELL_INTERBUF(pResInfo); STableBlockDistInfo p1 = {0}; tDeserializeBlockDistInfo(varDataVal(pInputCol->pData), varDataLen(pInputCol->pData), &p1); From 7cf0add513d812c687a83bb1712ef53830d2a234 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Dec 2023 11:28:03 +0800 Subject: [PATCH 6/6] fix(stream): fix error caused by refactor. --- source/dnode/vnode/src/tq/tq.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4834924fe0..138c58b45f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -713,8 +713,8 @@ static STaskId replaceStreamTaskId(SStreamTask* pTask) { static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) { ASSERT(pTask->info.fillHistory); - pTask->streamTaskId.taskId = pId->taskId; - pTask->streamTaskId.streamId = pId->streamId; + pTask->id.taskId = pId->taskId; + pTask->id.streamId = pId->streamId; } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {