From d799212fb2a1973291489425fea9fcde2493da3b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 12 Apr 2024 10:02:18 +0800 Subject: [PATCH 01/16] refactor: do some internal refactor. --- include/dnode/vnode/tqCommon.h | 2 + source/dnode/snode/inc/sndInt.h | 1 - source/dnode/snode/src/snode.c | 42 ++--------- source/dnode/vnode/src/tq/tq.c | 88 +--------------------- source/dnode/vnode/src/tqCommon/tqCommon.c | 61 +++++++++++++++ 5 files changed, 72 insertions(+), 122 deletions(-) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 22a176f0bb..93e0064192 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -40,4 +40,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); +int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode); + #endif // TDENGINE_TQ_COMMON_H diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 024c3c6bae..8c5d056893 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -31,7 +31,6 @@ extern "C" { #endif struct SSnode { - char* path; SStreamMeta* pMeta; SMsgCb msgCb; }; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 3bef5b595b..bd07974c3f 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -32,6 +32,7 @@ static STaskId replaceStreamTaskId(SStreamTask *pTask) { pTask->id.taskId = pTask->streamTaskId.taskId; return id; } + static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) { ASSERT(pTask->info.fillHistory); pTask->id.taskId = pId->taskId; @@ -48,42 +49,16 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer streamTaskOpenAllUpstreamInput(pTask); - STaskId taskId = {0}; - if (pTask->info.fillHistory) { - taskId = replaceStreamTaskId(pTask); + code = tqExpandStreamTask(pTask, pSnode->pMeta, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; } - pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); - if (pTask->pState == NULL) { - sndError("s-task:%s failed to open state for task", pTask->id.idStr); - return -1; - } else { - sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); - } - - if (pTask->info.fillHistory) { - restoreStreamTaskId(pTask, &taskId); - } - - int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); - SReadHandle handle = { - .checkpointId = pTask->chkInfo.checkpointId, - .vnode = NULL, - .numOfVgroups = numOfVgroups, - .pStateBackend = pTask->pState, - .fillHistory = pTask->info.fillHistory, - .winRange = pTask->dataRange.window, - }; - initStreamStateAPI(&handle.api); - - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, SNODE_HANDLE, pTask->id.taskId); - ASSERT(pTask->exec.pExecutor); - qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); - streamTaskResetUpstreamStageInfo(pTask); streamSetupScheduleTrigger(pTask); SCheckpointInfo *pChkInfo = &pTask->chkInfo; + // checkpoint ver is the kept version, handled data should be the next version. if (pChkInfo->checkpointId != 0) { pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; @@ -117,11 +92,6 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pSnode->path = taosStrdup(path); - if (pSnode->path == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; - } pSnode->msgCb = pOption->msgCb; pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback); @@ -140,7 +110,6 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { return pSnode; FAIL: - taosMemoryFree(pSnode->path); taosMemoryFree(pSnode); return NULL; } @@ -156,7 +125,6 @@ void sndClose(SSnode *pSnode) { streamMetaNotifyClose(pSnode->pMeta); streamMetaCommit(pSnode->pMeta); streamMetaClose(pSnode->pMeta); - taosMemoryFree(pSnode->path); taosMemoryFree(pSnode); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7886967be0..fb47414fb9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -699,22 +699,6 @@ end: static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } -static STaskId replaceStreamTaskId(SStreamTask* pTask) { - ASSERT(pTask->info.fillHistory); - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - - pTask->id.streamId = pTask->streamTaskId.streamId; - pTask->id.taskId = pTask->streamTaskId.taskId; - - return id; -} - -static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) { - ASSERT(pTask->info.fillHistory); - pTask->id.taskId = pId->taskId; - pTask->id.streamId = pId->streamId; -} - int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { int32_t vgId = TD_VID(pTq->pVnode); tqDebug("s-task:0x%x start to expand task", pTask->id.taskId); @@ -724,74 +708,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { return code; } - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - STaskId taskId = {0}; - if (pTask->info.fillHistory) { - taskId = replaceStreamTaskId(pTask); - } - - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); - if (pTask->pState == NULL) { - tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId); - return -1; - } - - tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); - if (pTask->info.fillHistory) { - restoreStreamTaskId(pTask, &taskId); - } - - SReadHandle handle = { - .checkpointId = pTask->chkInfo.checkpointId, - .vnode = pTq->pVnode, - .initTqReader = 1, - .pStateBackend = pTask->pState, - .fillHistory = pTask->info.fillHistory, - .winRange = pTask->dataRange.window, - }; - - initStorageAPI(&handle.api); - - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); - if (pTask->exec.pExecutor == NULL) { - return -1; - } - - qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); - } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { - STaskId taskId = {0}; - if (pTask->info.fillHistory) { - taskId = replaceStreamTaskId(pTask); - } - - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); - if (pTask->pState == NULL) { - tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId); - return -1; - } else { - tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); - } - - if (pTask->info.fillHistory) { - restoreStreamTaskId(pTask, &taskId); - } - - SReadHandle handle = { - .checkpointId = pTask->chkInfo.checkpointId, - .vnode = NULL, - .numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), - .pStateBackend = pTask->pState, - .fillHistory = pTask->info.fillHistory, - .winRange = pTask->dataRange.window, - }; - - initStorageAPI(&handle.api); - - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); - if (pTask->exec.pExecutor == NULL) { - return -1; - } - qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); + code = tqExpandStreamTask(pTask, pTq->pStreamMeta, pTq->pVnode); + if (code != TSDB_CODE_SUCCESS) { + return code; } // sink @@ -827,6 +746,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { streamTaskResetUpstreamStageInfo(pTask); streamSetupScheduleTrigger(pTask); + SCheckpointInfo* pChkInfo = &pTask->chkInfo; // checkpoint ver is the kept version, handled data should be the next version. diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 2fa9f9a9ff..f85bb8cee5 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -23,6 +23,67 @@ typedef struct STaskUpdateEntry { int32_t transId; } STaskUpdateEntry; +static STaskId replaceStreamTaskId(SStreamTask* pTask) { + ASSERT(pTask->info.fillHistory); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + + pTask->id.streamId = pTask->streamTaskId.streamId; + pTask->id.taskId = pTask->streamTaskId.taskId; + + return id; +} + +static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) { + ASSERT(pTask->info.fillHistory); + pTask->id.taskId = pId->taskId; + pTask->id.streamId = pId->streamId; +} + +int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode) { + int32_t vgId = pMeta->vgId; + STaskId taskId = {0}; + + if (pTask->info.fillHistory) { + taskId = replaceStreamTaskId(pTask); + } + + pTask->pState = streamStateOpen(pMeta->path, pTask, false, -1, -1); + if (pTask->pState == NULL) { + tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId); + return -1; + } else { + tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); + } + + if (pTask->info.fillHistory) { + restoreStreamTaskId(pTask, &taskId); + } + + SReadHandle handle = { + .checkpointId = pTask->chkInfo.checkpointId, + .pStateBackend = pTask->pState, + .fillHistory = pTask->info.fillHistory, + .winRange = pTask->dataRange.window, + }; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + handle.vnode = pVnode; + handle.initTqReader = 1; + } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { + handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); + } + + initStorageAPI(&handle.api); + + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); + if (pTask->exec.pExecutor == NULL) { + tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr); + return -1; + } + + qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); + return TSDB_CODE_SUCCESS; +} + int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { int32_t vgId = pMeta->vgId; int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); From 15c18af221948986b94f559d689abb238339c552 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 13 Apr 2024 18:27:38 +0800 Subject: [PATCH 02/16] fix(stream): fix init error. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 12 +++++++----- source/libs/stream/src/streamMeta.c | 3 --- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index f85bb8cee5..1c3a760bab 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -74,13 +74,15 @@ int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode) initStorageAPI(&handle.api); - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); - if (pTask->exec.pExecutor == NULL) { - tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr); - return -1; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) { + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); + if (pTask->exec.pExecutor == NULL) { + tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr); + return -1; + } + qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } - qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8d5e4f3c87..ea18e791a6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -591,19 +591,16 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { - tFreeStreamTask(pTask); return -1; } taosArrayPush(pMeta->pTaskList, &pTask->id); if (streamMetaSaveTask(pMeta, pTask) < 0) { - tFreeStreamTask(pTask); return -1; } if (streamMetaCommit(pMeta) < 0) { - tFreeStreamTask(pTask); return -1; } From 852052a99beb29bad066e891eb1f2ede50ac4c51 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 15 Apr 2024 07:47:07 +0800 Subject: [PATCH 03/16] fix: extracted data block shall initialize pks of datablock info --- source/common/src/tdatablock.c | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 7a45e44eab..842038e41f 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -653,25 +653,15 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 return NULL; } - SSDataBlock* pDst = createDataBlock(); + SSDataBlock* pDst = createOneDataBlock(pBlock, false); if (pDst == NULL) { return NULL; } - pDst->info = pBlock->info; - pDst->info.rows = 0; - pDst->info.capacity = 0; - pDst->info.rowSize = 0; - size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData colInfo = {0}; - SColumnInfoData* pSrcCol = taosArrayGet(pBlock->pDataBlock, i); - colInfo.info = pSrcCol->info; - blockDataAppendColInfo(pDst, &colInfo); - } blockDataEnsureCapacity(pDst, rowCount); + size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); From 31a728b1b83de9f3563f648fae722da2bc9edcc4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Apr 2024 09:49:41 +0800 Subject: [PATCH 04/16] fix(stream): update the stream task meta table. --- source/common/src/systable.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index e0c0cc89ab..80b3efb05d 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -180,7 +180,12 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, // {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "info", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, + {.name = "checkpointId", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpointInfo", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; static const SSysDbTableSchema userTblsSchema[] = { From 5f28ee0e525812b48b7d8914993fc25c760db6f6 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 15 Apr 2024 11:28:09 +0800 Subject: [PATCH 05/16] fix: wrongly changed the column type of pk col during last row scan optimize --- source/libs/planner/src/planOptimizer.c | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index b3f50b540b..d5a25fb517 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2997,15 +2997,9 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic if (FUNCTION_TYPE_LAST == funcType) { nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt); nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1)); - if (pFunc->hasPk) { - if (LIST_LENGTH(pFunc->pParameterList) != 2) { - planError("last func which has pk but its parameter list length is not %d", 2); - nodesClearList(cxt.pLastCols); - taosArrayDestroy(isDuplicateCol); - return TSDB_CODE_PLAN_INTERNAL_ERROR; - } - nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 1), lastRowScanOptSetColDataType, &cxt); - } + } + if (pFunc->hasPk) { + nodesListMakeAppend(&cxt.pOtherCols, nodesListGetNode(pFunc->pParameterList, LIST_LENGTH(pFunc->pParameterList) - 1)); } } else { pNode = nodesListGetNode(pFunc->pParameterList, 0); From 04f1cfaf572d562f6c5728ab18aa0982da982cb4 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 15 Apr 2024 13:56:56 +0800 Subject: [PATCH 06/16] feat: add test case for wrong pk data type --- tests/system-test/2-query/pk_func.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/system-test/2-query/pk_func.py b/tests/system-test/2-query/pk_func.py index 30b6671f98..af6530fa51 100644 --- a/tests/system-test/2-query/pk_func.py +++ b/tests/system-test/2-query/pk_func.py @@ -642,7 +642,12 @@ class TDTestCase: tdSql.checkData(7, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) tdSql.checkData(7, 1, 8) tdSql.checkData(7, 2, 8) - + + tdSql.query('select ts, last(pk) from d1.st order by pk') + tdSql.checkRows(1) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(0, 1, 8) + tdSql.execute('drop database pk_func') def stop(self): tdSql.close() From 665107ad3a439d0c8ec4d4b608d9843f7651b378 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Apr 2024 14:23:37 +0800 Subject: [PATCH 07/16] enh(stream): add more info in meta table. --- include/libs/stream/tstream.h | 53 ++++++++++------- source/common/src/systable.c | 10 +++- source/dnode/mnode/impl/src/mndStream.c | 69 ++++++++++++++++++++++- source/dnode/mnode/impl/src/mndStreamHb.c | 8 ++- source/dnode/snode/src/snode.c | 3 + source/dnode/vnode/src/tq/tq.c | 2 + source/libs/stream/src/streamCheckpoint.c | 1 + source/libs/stream/src/streamMeta.c | 54 +++++++++++++----- source/libs/stream/src/streamStart.c | 2 +- source/libs/stream/src/streamTask.c | 12 ++-- 10 files changed, 167 insertions(+), 47 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c12bb146b4..5cecb1af42 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -304,9 +304,9 @@ typedef struct SStreamTaskId { typedef struct SCheckpointInfo { int64_t startTs; - int64_t checkpointId; - - int64_t checkpointVer; // latest checkpointId version + int64_t checkpointId; // latest checkpoint id + int64_t checkpointVer; // latest checkpoint offset in wal + int64_t checkpointTime; // latest checkpoint time int64_t processedVer; int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t failedId; // record the latest failed checkpoint id @@ -386,6 +386,9 @@ typedef struct STaskExecStatisInfo { int64_t created; int64_t init; int64_t start; + int64_t startCheckpointId; + int64_t startCheckpointVer; + int64_t step1Start; double step1El; int64_t step2Start; @@ -672,24 +675,34 @@ typedef struct { int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp); int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp); +typedef struct STaskCkptInfo { + int64_t latestId; // saved checkpoint id + int64_t latestVer; // saved checkpoint ver + int64_t latestTime; // latest checkpoint time + int64_t activeId; // current active checkpoint id + int32_t activeTransId; // checkpoint trans id + int8_t failed; // denote if the checkpoint is failed or not +} STaskCkptInfo; + typedef struct STaskStatusEntry { - STaskId id; - int32_t status; - int32_t statusLastDuration; // to record the last duration of current status - int64_t stage; - int32_t nodeId; - int64_t verStart; // start version in WAL, only valid for source task - int64_t verEnd; // end version in WAL, only valid for source task - int64_t processedVer; // only valid for source task - int64_t checkpointId; // current active checkpoint id - int32_t chkpointTransId; // checkpoint trans id - int8_t checkpointFailed; // denote if the checkpoint is failed or not - bool inputQChanging; // inputQ is changing or not - int64_t inputQUnchangeCounter; - double inputQUsed; // in MiB - double inputRate; - double sinkQuota; // existed quota size for sink task - double sinkDataSize; // sink to dst data size + STaskId id; + int32_t status; + int32_t statusLastDuration; // to record the last duration of current status + int64_t stage; + int32_t nodeId; + SVersionRange verRange; // start/end version in WAL, only valid for source task + int64_t processedVer; // only valid for source task + bool inputQChanging; // inputQ is changing or not + int64_t inputQUnchangeCounter; + double inputQUsed; // in MiB + double inputRate; + double sinkQuota; // existed quota size for sink task + double sinkDataSize; // sink to dst data size + int64_t startTime; + int64_t startCheckpointId; + int64_t startCheckpointVer; + int64_t hTaskId; + STaskCkptInfo checkpointInfo; } STaskStatusEntry; typedef struct SStreamHbMsg { diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 80b3efb05d..9f1509077c 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -159,6 +159,8 @@ static const SSysDbTableSchema userStbsSchema[] = { static const SSysDbTableSchema streamSchema[] = { {.name = "stream_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, + {.name = "stream_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "history_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, @@ -182,9 +184,13 @@ static const SSysDbTableSchema streamTaskSchema[] = { // {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "info", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, - {.name = "checkpointId", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, - {.name = "checkpointInfo", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "start_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "start_ver", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, + {.name = "checkpoint_id", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpoint_version", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8f9afb2adc..05b06e83a8 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1304,9 +1304,31 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false); + // create time pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false); + // stream id + char buf[128] = {0}; + int32_t len = tintToHex(pStream->uid, &buf[4]); + buf[2] = '0'; + buf[3] = 'x'; + varDataSetLen(buf, len + 2); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, buf, false); + + // related fill-history stream id + memset(buf, 0, tListLen(buf)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (pStream->hTaskUid != 0) { + len = tintToHex(pStream->hTaskUid, &buf[4]); + varDataSetLen(buf, len + 2); + colDataSetVal(pColInfo, numOfRows, buf, false); + } else { + colDataSetVal(pColInfo, numOfRows, buf, true); + } + + // related fill-history stream id char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -1469,13 +1491,14 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); // colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); + // info if (pTask->info.taskLevel == TASK_LEVEL__SINK) { const char *sinkStr = "%.2fMiB"; sprintf(buf, sinkStr, pe->sinkDataSize); } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // offset info const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; - sprintf(buf, offsetStr, pe->processedVer, pe->verStart, pe->verEnd); + sprintf(buf, offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); } STR_TO_VARSTR(vbuf, buf); @@ -1483,6 +1506,50 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + // start_time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startTime, false); + + // start id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startCheckpointId, false); + + // start ver + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startCheckpointVer, false); + + // checkpoint time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (pe->checkpointInfo.latestTime != 0) { + colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestTime, false); + } else { + colDataSetVal(pColInfo, numOfRows, 0, true); + } + + // checkpoint_id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestId, false); + + // checkpoint info + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false); + + // ds_err_info + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, 0, true); + + // history_task_id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (pe->hTaskId != 0) { + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->hTaskId, false); + } else { + colDataSetVal(pColInfo, numOfRows, 0, true); + } + + // history_task_status + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, 0, true); + return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index c8f943b931..1fedee3bcf 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -294,12 +294,14 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } streamTaskStatusCopy(pTaskEntry, p); - if ((p->checkpointId != 0) && p->checkpointFailed) { + + STaskCkptInfo *pChkInfo = &p->checkpointInfo; + if ((pChkInfo->activeId != 0) && pChkInfo->failed) { mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId, - p->checkpointId, p->chkpointTransId); + pChkInfo->activeId, pChkInfo->activeTransId); SFailedCheckpointInfo info = { - .transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId}; + .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId}; addIntoCheckpointList(pFailedTasks, &info); } } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index bd07974c3f..f17716eda0 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -63,6 +63,9 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer if (pChkInfo->checkpointId != 0) { pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; pChkInfo->processedVer = pChkInfo->checkpointVer; + pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer; + pTask->execInfo.startCheckpointId = pChkInfo->checkpointId; + sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fb47414fb9..92dc55c0c3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -753,6 +753,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { if (pChkInfo->checkpointId != 0) { pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; pChkInfo->processedVer = pChkInfo->checkpointVer; + pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer; + pTask->execInfo.startCheckpointId = pChkInfo->checkpointId; tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 7f52c5d2f0..86ee2b837d 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -309,6 +309,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { pCKInfo->checkpointId = pCKInfo->checkpointingId; pCKInfo->checkpointVer = pCKInfo->processedVer; + pCKInfo->checkpointTime = pCKInfo->startTs; streamTaskClearCheckInfo(p, false); taosThreadMutexUnlock(&p->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ea18e791a6..3c22f33f93 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -957,11 +957,18 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1; if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1; if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1; - if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1; - if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1; - if (tEncodeI64(pEncoder, ps->checkpointId) < 0) return -1; - if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1; - if (tEncodeI32(pEncoder, ps->chkpointTransId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->verRange.minVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->verRange.maxVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.activeId) < 0) return -1; + if (tEncodeI8(pEncoder, ps->checkpointInfo.failed) < 0) return -1; + if (tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.latestId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.latestVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1; + if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1; + if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->hTaskId) < 0) return -1; } int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes); @@ -996,11 +1003,19 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1; if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.checkpointId) < 0) return -1; - if (tDecodeI8(pDecoder, &entry.checkpointFailed) < 0) return -1; - if (tDecodeI32(pDecoder, &entry.chkpointTransId) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.verRange.minVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.verRange.maxVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.checkpointInfo.activeId) < 0) return -1; + if (tDecodeI8(pDecoder, &entry.checkpointInfo.failed) < 0) return -1; + if (tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId) < 0) return -1; + + if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestId) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.hTaskId) < 0) return -1; entry.id.taskId = taskId; taosArrayPush(pReq->pTaskStatus, &entry); @@ -1102,7 +1117,16 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { .status = streamTaskGetStatus(*pTask)->state, .nodeId = hbMsg.vgId, .stage = pMeta->stage, + .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), + .startTime = (*pTask)->execInfo.start, + .checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId, + .checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer, + .checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime, + .hTaskId = (*pTask)->hTaskInfo.id.taskId, + + .startCheckpointId = (*pTask)->execInfo.startCheckpointId, + .startCheckpointVer = (*pTask)->execInfo.startCheckpointVer, }; entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); @@ -1112,11 +1136,11 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { } if ((*pTask)->chkInfo.checkpointingId != 0) { - entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0; - entry.checkpointId = (*pTask)->chkInfo.checkpointingId; - entry.chkpointTransId = (*pTask)->chkInfo.transId; + entry.checkpointInfo.failed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0; + entry.checkpointInfo.activeId = (*pTask)->chkInfo.checkpointingId; + entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.transId; - if (entry.checkpointFailed) { + if (entry.checkpointInfo.failed) { stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId); } } @@ -1127,7 +1151,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { entry.processedVer = (*pTask)->chkInfo.processedVer; } - walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd); + walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer); } addUpdateNodeIntoHbMsg(*pTask, &hbMsg); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index f2a694a554..32bd3742ad 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -44,7 +44,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId); static void doProcessDownstreamReadyRsp(SStreamTask* pTask); int32_t streamTaskSetReady(SStreamTask* pTask) { - int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); + int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); SStreamTaskState* p = streamTaskGetStatus(pTask); if ((p->state == TASK_STATUS__SCAN_HISTORY) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c34e162326..7badbfa9f3 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -849,13 +849,15 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) pDst->inputQUsed = pSrc->inputQUsed; pDst->inputRate = pSrc->inputRate; pDst->processedVer = pSrc->processedVer; - pDst->verStart = pSrc->verStart; - pDst->verEnd = pSrc->verEnd; + pDst->verRange = pSrc->verRange; pDst->sinkQuota = pSrc->sinkQuota; pDst->sinkDataSize = pSrc->sinkDataSize; - pDst->checkpointId = pSrc->checkpointId; - pDst->checkpointFailed = pSrc->checkpointFailed; - pDst->chkpointTransId = pSrc->chkpointTransId; + pDst->checkpointInfo = pSrc->checkpointInfo; + pDst->startCheckpointId = pSrc->startCheckpointId; + pDst->startCheckpointVer = pSrc->startCheckpointVer; + + pDst->startTime = pSrc->startTime; + pDst->hTaskId = pSrc->hTaskId; } static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { From 31f5b0dca76bf26f0bc961ef96cc3536403a299c Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 15 Apr 2024 14:49:47 +0800 Subject: [PATCH 08/16] fix: add test case for varchar primary key --- tests/parallel_test/cases.task | 1 + tests/system-test/2-query/pk_varchar.py | 306 ++++++++++++++++++++++++ 2 files changed, 307 insertions(+) create mode 100644 tests/system-test/2-query/pk_varchar.py diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 9b00075761..bdc6ef0984 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -47,6 +47,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/pk_error.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/pk_func.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/pk_varchar.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/pk_func_group.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_expr.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/project_group.py diff --git a/tests/system-test/2-query/pk_varchar.py b/tests/system-test/2-query/pk_varchar.py new file mode 100644 index 0000000000..167e1079d5 --- /dev/null +++ b/tests/system-test/2-query/pk_varchar.py @@ -0,0 +1,306 @@ +import sys +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import tdDnodes +from math import inf + +class TDTestCase: + def caseDescription(self): + ''' + case1: [TD-] + ''' + return + + def init(self, conn, logSql, replicaVer=1): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + self.conn = conn + + def restartTaosd(self, index=1, dbname="db"): + tdDnodes.stop(index) + tdDnodes.startWithoutSleep(index) + tdSql.execute(f"use pk_varchar") + + def run(self): + print("running {}".format(__file__)) + tdSql.execute("drop database if exists pk_varchar") + tdSql.execute("create database if not exists pk_varchar") + tdSql.execute('use pk_varchar') + tdSql.execute('drop database IF EXISTS d1;') + + tdSql.execute('drop database IF EXISTS d2;') + + tdSql.execute('create database d1 vgroups 1') + + tdSql.execute('use d1;') + + tdSql.execute('create table st(ts timestamp, pk varchar(256) primary key, f int) tags(t int);') + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:00', '1', 1);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:00', '2', 2);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:00', '3', 3);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:00', '4', 4);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', '1', 1);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', '4', 4);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:01', '3', 3);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:01', '2', 2);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:02', '6', 6);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:02', '5', 5);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:02', '8', 8);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:02', '7', 7);") + + tdSql.query('select first(*) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(0, 1, '1') + tdSql.checkData(0, 2, 1) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(1, 1, '3') + tdSql.checkData(1, 2, 3) + + tdSql.query('select last(*) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(0, 1, '6') + tdSql.checkData(0, 2, 6) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(1, 1, '8') + tdSql.checkData(1, 2, 8) + + tdSql.query('select last_row(*) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(0, 1, '6') + tdSql.checkData(0, 2, 6) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(1, 1, '8') + tdSql.checkData(1, 2, 8) + + tdSql.query('select ts,diff(f) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(4) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(0, 1, 0) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(1, 1, 4) + tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(2, 1, -1) + tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(3, 1, 5) + + tdSql.query('select irate(f) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, 4.0) + tdSql.checkData(1, 0, 5.0) + + tdSql.query('select ts,derivative(f, 1s, 0) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(4) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(0, 1, 0.0) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(1, 1, 4.0) + tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(2, 1, -1.0) + tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(3, 1, 5.0) + + tdSql.query('select twa(f) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, 2.0) + tdSql.checkData(1, 0, 3.5) + + tdSql.query('select ts,pk,unique(f) from d1.st partition by tbname order by tbname,ts,pk;') + tdSql.checkRows(10) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(0, 1, '1') + tdSql.checkData(0, 2, 1) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(1, 1, '2') + tdSql.checkData(1, 2, 2) + tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(2, 1, '4') + tdSql.checkData(2, 2, 4) + tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(3, 1, '5') + tdSql.checkData(3, 2, 5) + tdSql.checkData(4, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(4, 1, '6') + tdSql.checkData(4, 2, 6) + tdSql.checkData(5, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(5, 1, '3') + tdSql.checkData(5, 2, 3) + tdSql.checkData(6, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(6, 1, '4') + tdSql.checkData(6, 2, 4) + tdSql.checkData(7, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(7, 1, '2') + tdSql.checkData(7, 2, 2) + tdSql.checkData(8, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(8, 1, '7') + tdSql.checkData(8, 2, 7) + tdSql.checkData(9, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(9, 1, '8') + tdSql.checkData(9, 2, 8) + + tdSql.query('select * from d1.st order by ts limit 2;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(0, 1, '1') + tdSql.checkData(0, 2, 1) + tdSql.checkData(0, 3, 1) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(1, 1, '2') + tdSql.checkData(1, 2, 2) + tdSql.checkData(1, 3, 1) + + tdSql.execute('create database d2 vgroups 2') + + tdSql.execute('use d2;') + + tdSql.execute('create table st(ts timestamp, pk varchar(256) primary key, f int) tags(t int);') + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:00', '1', 1);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:00', '2', 2);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:00', '3', 3);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:00', '4', 4);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', '1', 1);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', '4', 4);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:01', '3', 3);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:01', '2', 2);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:02', '6', 6);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:02', '5', 5);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:02', '8', 8);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:02', '7', 7);") + + tdSql.query('select first(*) from d2.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(0, 1, '1') + tdSql.checkData(0, 2, 1) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(1, 1, '3') + tdSql.checkData(1, 2, 3) + + tdSql.query('select last(*) from d2.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(0, 1, '6') + tdSql.checkData(0, 2, 6) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(1, 1, '8') + tdSql.checkData(1, 2, 8) + + tdSql.query('select last_row(*) from d2.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(0, 1, '6') + tdSql.checkData(0, 2, 6) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(1, 1, '8') + tdSql.checkData(1, 2, 8) + + tdSql.query('select ts,diff(f) from d2.st partition by tbname order by tbname;') + tdSql.checkRows(4) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(0, 1, 0) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(1, 1, 4) + tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(2, 1, -1) + tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(3, 1, 5) + + tdSql.query('select irate(f) from d2.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, 4.0) + tdSql.checkData(1, 0, 5.0) + + tdSql.query('select ts,derivative(f, 1s, 0) from d2.st partition by tbname order by tbname;') + tdSql.checkRows(4) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(0, 1, 0.0) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(1, 1, 4.0) + tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(2, 1, -1.0) + tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(3, 1, 5.0) + + tdSql.query('select twa(f) from d2.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, 2.0) + tdSql.checkData(1, 0, 3.5) + + tdSql.query('select ts,pk,unique(f) from d2.st partition by tbname order by tbname,ts,pk;') + tdSql.checkRows(10) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(0, 1, '1') + tdSql.checkData(0, 2, 1) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(1, 1, '2') + tdSql.checkData(1, 2, 2) + tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(2, 1, '4') + tdSql.checkData(2, 2, 4) + tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(3, 1, '5') + tdSql.checkData(3, 2, 5) + tdSql.checkData(4, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(4, 1, '6') + tdSql.checkData(4, 2, 6) + tdSql.checkData(5, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(5, 1, '3') + tdSql.checkData(5, 2, 3) + tdSql.checkData(6, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(6, 1, '4') + tdSql.checkData(6, 2, 4) + tdSql.checkData(7, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(7, 1, '2') + tdSql.checkData(7, 2, 2) + tdSql.checkData(8, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(8, 1, '7') + tdSql.checkData(8, 2, 7) + tdSql.checkData(9, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(9, 1, '8') + tdSql.checkData(9, 2, 8) + + tdSql.query('select * from d2.st order by ts limit 2;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(0, 1, '1') + tdSql.checkData(0, 2, 1) + tdSql.checkData(0, 3, 1) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(1, 1, '2') + tdSql.checkData(1, 2, 2) + tdSql.checkData(1, 3, 1) + + tdSql.execute('drop database pk_varchar') + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) From ac2853f28c14ac5f43173c520c546be994e44674 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 15 Apr 2024 15:36:54 +0800 Subject: [PATCH 09/16] fix: drop table after commit --- source/dnode/vnode/src/tsdb/tsdbCommit2.c | 79 ++++------------------- 1 file changed, 11 insertions(+), 68 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 700d6b10b7..db57ec9835 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -37,7 +37,6 @@ typedef struct { int64_t cid; int64_t now; TSKEY nextKey; - TSKEY maxDelKey; int32_t fid; int32_t expLevel; SDiskID did; @@ -46,7 +45,6 @@ typedef struct { STFileSet *fset; TABLEID tbid[1]; bool hasTSData; - bool skipTsRow; } ctx[1]; // reader @@ -128,21 +126,8 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { continue; } } - /* - extern int8_t tsS3Enabled; - int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs); - committer->ctx->skipTsRow = false; - if (tsS3Enabled && nlevel > 1 && committer->ctx->did.level == nlevel - 1) { - committer->ctx->skipTsRow = true; - } - */ int64_t ts = TSDBROW_TS(&row->row); - - if (committer->ctx->skipTsRow && ts <= committer->ctx->maxKey) { - ts = committer->ctx->maxKey + 1; - } - if (ts > committer->ctx->maxKey) { committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts); code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid); @@ -175,15 +160,13 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) { int64_t numRecord = 0; SMetaInfo info; - if (committer->ctx->fset == NULL && !committer->ctx->hasTSData) { - if (committer->ctx->maxKey < committer->ctx->maxDelKey) { - committer->ctx->nextKey = committer->ctx->maxKey + 1; - } else { - committer->ctx->nextKey = TSKEY_MAX; - } - return 0; + if (committer->tsdb->imem->nDel == 0) { + goto _exit; } + // do not need to write tomb data if there is no ts data + bool skip = (committer->ctx->fset == NULL && !committer->ctx->hasTSData); + committer->ctx->tbid->suid = 0; committer->ctx->tbid->uid = 0; for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) { @@ -210,9 +193,11 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) { record->skey = TMAX(record->skey, committer->ctx->minKey); record->ekey = TMIN(record->ekey, committer->ctx->maxKey); - numRecord++; - code = tsdbFSetWriteTombRecord(committer->writer, record); - TSDB_CHECK_CODE(code, lino, _exit); + if (!skip) { + numRecord++; + code = tsdbFSetWriteTombRecord(committer->writer, record); + TSDB_CHECK_CODE(code, lino, _exit); + } } code = tsdbIterMergerNext(committer->tombIterMerger); @@ -406,31 +391,6 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { // reset nextKey committer->ctx->nextKey = TSKEY_MAX; - committer->ctx->skipTsRow = false; - - extern int8_t tsS3Enabled; - extern int32_t tsS3UploadDelaySec; - long s3Size(const char *object_name); - int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs); - if (tsS3Enabled && nlevel > 1 && committer->ctx->fset) { - STFileObj *fobj = committer->ctx->fset->farr[TSDB_FTYPE_DATA]; - if (fobj && fobj->f->did.level == nlevel - 1) { - // if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay - const char *object_name = taosDirEntryBaseName((char *)fobj->fname); - - if (taosCheckExistFile(fobj->fname)) { - int32_t mtime = 0; - taosStatFile(fobj->fname, NULL, &mtime, NULL); - if (mtime < committer->ctx->now - tsS3UploadDelaySec) { - committer->ctx->skipTsRow = true; - } - } else /*if (s3Size(object_name) > 0) */ { - committer->ctx->skipTsRow = true; - } - } - // new fset can be written with ts data - } - _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); @@ -519,28 +479,11 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co STbData *tbData = TCONTAINER_OF(node, STbData, rbtn); for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) { - if (delData->sKey < committer->ctx->nextKey) { - committer->ctx->nextKey = delData->sKey; - } + committer->ctx->nextKey = TMIN(committer->ctx->nextKey, delData->sKey); } } } - committer->ctx->maxDelKey = TSKEY_MIN; - TSKEY minKey = TSKEY_MAX; - TSKEY maxKey = TSKEY_MIN; - if (TARRAY2_SIZE(committer->fsetArr) > 0) { - STFileSet *fset = TARRAY2_LAST(committer->fsetArr); - tsdbFidKeyRange(fset->fid, committer->minutes, committer->precision, &minKey, &committer->ctx->maxDelKey); - - fset = TARRAY2_FIRST(committer->fsetArr); - tsdbFidKeyRange(fset->fid, committer->minutes, committer->precision, &minKey, &maxKey); - } - - if (committer->ctx->nextKey < TMIN(tsdb->imem->minKey, minKey)) { - committer->ctx->nextKey = TMIN(tsdb->imem->minKey, minKey); - } - _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); From b936969a013e7cee86034f5ee31c0d58307dc0a9 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 15 Apr 2024 18:25:23 +0800 Subject: [PATCH 10/16] more fix --- source/dnode/vnode/src/vnd/vnodeSvr.c | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b5e049b692..8fcd64373f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -2060,12 +2060,14 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in tDecoderInit(pCoder, pReq, len); tDecodeDeleteRes(pCoder, pRes); - for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) { - uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid); - code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey); - if (code) goto _err; - code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, pRes->ctimeMs); - if (code) goto _err; + if (pRes->affectedRows > 0) { + for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) { + uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid); + code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey); + if (code) goto _err; + code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, pRes->ctimeMs); + if (code) goto _err; + } } code = tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len); From 9cce9c221d94c9868c1d634d0bbd418340cf0610 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 15 Apr 2024 18:32:06 +0800 Subject: [PATCH 11/16] cos/multi-write: empty impl for tsdb async compact --- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 8a2438d5a4..e91437a699 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -2296,5 +2296,5 @@ _OVER: int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; } -int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync); +int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync) { return 0; } #endif From 87dfc1f931a0c8311ca3d8c8b6d3d27f3079b677 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Apr 2024 18:40:51 +0800 Subject: [PATCH 12/16] fix(stream):fix error in unit test cases. --- source/dnode/mnode/impl/test/stream/stream.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index 8d106b1ede..ae00f47ab7 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -62,8 +62,8 @@ SRpcMsg buildHbReq() { entry.id.taskId = 5; entry.id.streamId = defStreamId; - entry.checkpointId = 1; - entry.checkpointFailed = true; + entry.checkpointInfo.activeId = 1; + entry.checkpointInfo.failed = true; taosArrayPush(msg.pTaskStatus, &entry); } From 1ac192c069900ed8c034ca90bca11dbc679899e9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Apr 2024 18:41:38 +0800 Subject: [PATCH 13/16] fix(stream): keep the original tsdb scan version range. --- include/libs/stream/tstream.h | 3 ++- source/dnode/mnode/impl/src/mndStream.c | 7 ++++++- source/dnode/vnode/src/tq/tq.c | 16 ++++++++-------- source/dnode/vnode/src/tq/tqStreamTask.c | 14 ++++++++------ source/libs/executor/src/executor.c | 4 ++-- source/libs/stream/src/streamStart.c | 6 ++++-- 6 files changed, 30 insertions(+), 20 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5cecb1af42..8bced20ca3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -445,6 +445,7 @@ struct SStreamTask { SCheckpointInfo chkInfo; STaskExec exec; SDataRange dataRange; + SVersionRange step2Range; SHistoryTaskInfo hTaskInfo; STaskId streamTaskId; STaskExecStatisInfo execInfo; @@ -901,4 +902,4 @@ void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp); } #endif -#endif /* ifndef _STREAM_H_ */ \ No newline at end of file +#endif /* ifndef _STREAM_H_ */ diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 05b06e83a8..5d18d0d22e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1541,7 +1541,12 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS // history_task_id pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); if (pe->hTaskId != 0) { - colDataSetVal(pColInfo, numOfRows, (const char*)&pe->hTaskId, false); + memset(idstr, 0, tListLen(idstr)); + len = tintToHex(pe->hTaskId, &idstr[4]); + idstr[2] = '0'; + idstr[3] = 'x'; + varDataSetLen(idstr, len + 2); + colDataSetVal(pColInfo, numOfRows, idstr, false); } else { colDataSetVal(pColInfo, numOfRows, 0, true); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 92dc55c0c3..791a2c2d92 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -800,33 +800,33 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) { const char* id = pTask->id.idStr; int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; - SVersionRange* pRange = &pTask->dataRange.range; + SVersionRange* pStep2Range = &pTask->step2Range; // if it's an source task, extract the last version in wal. bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer); pTask->execInfo.step2Start = taosGetTimestampMs(); if (done) { - qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pRange->minVer, - pRange->maxVer, 0.0); + qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pStep2Range->minVer, + pStep2Range->maxVer, 0.0); streamTaskPutTranstateIntoInputQ(pTask); streamExecTask(pTask); // exec directly } else { STimeWindow* pWindow = &pTask->dataRange.window; - tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 + tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 ", do secondary scan-history from WAL after halt the related stream task:%s", - id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, + id, pTask->info.taskLevel, pStep2Range->minVer, pStep2Range->maxVer, pWindow->skey, pWindow->ekey, pStreamTask->id.idStr); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); + streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow); - int64_t dstVer = pTask->dataRange.range.minVer; + int64_t dstVer =pStep2Range->minVer; pTask->chkInfo.nextProcessVer = dstVer; walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, - pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); + pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE); /*int8_t status = */ streamTaskSetSchedStatusInactive(pTask); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 73508202d9..19e53c7d15 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -242,21 +242,23 @@ int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { // todo handle memory error bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { const char* id = pTask->id.idStr; - int64_t maxVer = pTask->dataRange.range.maxVer; + int64_t maxVer = pTask->step2Range.maxVer; - if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { + if ((pTask->info.fillHistory == 1) && ver > maxVer) { if (!pTask->status.appendTranstateBlock) { qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal anymore, add transfer-state block into inputQ", id, ver, maxVer); double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0; - qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); + qDebug("s-task:%s scan-history from WAL stage(step 2) ended, range:%" PRId64 "-%" PRId64 ", elapsed time:%.2fs", + id, pTask->step2Range.minVer, maxVer, el); /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask); return true; } else { - qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal", - id, ver, maxVer); + qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the ver range:%" PRId64 "-%" PRId64 + ", not scan wal", + id, ver, pTask->step2Range.minVer, maxVer); } } @@ -389,7 +391,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { } int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); - int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX; + int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->step2Range.maxVer : INT64_MAX; taosThreadMutexLock(&pTask->lock); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 26a80cc6b5..12c5504007 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -926,8 +926,8 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan pStreamInfo->fillHistoryWindow = *pWindow; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2; - qDebug("%s step 2. set param for stream scanner scan wal, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 - " - %" PRId64, + qDebug("%s step 2. set param for stream scanner scan wal, verRange:%" PRId64 "-%" PRId64 ", window:%" PRId64 + "-%" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); return 0; diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 32bd3742ad..3abca307da 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -868,8 +868,10 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVe } else { // 2. do secondary scan of the history data, the time window remain, and the version range is updated to // [pTask->dataRange.range.maxVer, ver1] - pRange->minVer = walScanStartVer; - pRange->maxVer = nextProcessVer - 1; + pTask->step2Range.minVer = walScanStartVer; + pTask->step2Range.maxVer = nextProcessVer - 1; + stDebug("s-task:%s set step2 verRange:%" PRId64 "-%" PRId64 ", step1 verRange:%" PRId64 "-%" PRId64, pTask->id.idStr, + pTask->step2Range.minVer, pTask->step2Range.maxVer, pRange->minVer, pRange->maxVer); return false; } } From d92f0706e797f503c7a5db73f41ef6448f0cc461 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Apr 2024 18:43:58 +0800 Subject: [PATCH 14/16] fix(tsdb): check the boundary value when reseting range. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 69f4f82459..6100b8363e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4807,7 +4807,13 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { bool asc = ASCENDING_TRAVERSE(pReader->info.order); int32_t step = asc ? 1 : -1; - int64_t ts = asc ? pReader->info.window.skey - 1 : pReader->info.window.ekey + 1; + + int64_t ts = 0; + if (asc) { + ts = (pReader->info.window.skey > INT64_MIN)? pReader->info.window.skey-1:pReader->info.window.skey; + } else { + ts = (pReader->info.window.ekey < INT64_MAX)? pReader->info.window.ekey + 1:pReader->info.window.ekey; + } resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step); // no data in files, let's try buffer in memory From eba924776ff34f7257054a5450dbc6bfee34b2d9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Apr 2024 23:26:18 +0800 Subject: [PATCH 15/16] fix(stream): update test cases. --- tests/system-test/0-others/information_schema.py | 2 +- tests/system-test/1-insert/drop.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index c3d65482fc..ffdd9d191d 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -221,7 +221,7 @@ class TDTestCase: tdSql.checkEqual(20470,len(tdSql.queryResult)) tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") - tdSql.checkEqual(True, len(tdSql.queryResult) in range(215, 230)) + tdSql.checkEqual(True, len(tdSql.queryResult) in range(226, 241)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkEqual(54, len(tdSql.queryResult)) diff --git a/tests/system-test/1-insert/drop.py b/tests/system-test/1-insert/drop.py index 8775450ff0..21817ef20d 100644 --- a/tests/system-test/1-insert/drop.py +++ b/tests/system-test/1-insert/drop.py @@ -147,11 +147,11 @@ class TDTestCase: tdSql.execute(f'create stream {stream_name} trigger at_once ignore expired 0 into stb as select * from {self.dbname}.{stbname} partition by tbname') tdSql.query(f'select * from information_schema.ins_streams where stream_name = "{stream_name}"') print(tdSql.queryResult) - tdSql.checkEqual(tdSql.queryResult[0][2],f'create stream {stream_name} trigger at_once ignore expired 0 into stb as select * from {self.dbname}.{stbname} partition by tbname') + tdSql.checkEqual(tdSql.queryResult[0][4],f'create stream {stream_name} trigger at_once ignore expired 0 into stb as select * from {self.dbname}.{stbname} partition by tbname') tdSql.execute(f'drop stream {stream_name}') tdSql.execute(f'create stream {stream_name} trigger at_once ignore expired 0 into stb1 as select * from tb') tdSql.query(f'select * from information_schema.ins_streams where stream_name = "{stream_name}"') - tdSql.checkEqual(tdSql.queryResult[0][2],f'create stream {stream_name} trigger at_once ignore expired 0 into stb1 as select * from tb') + tdSql.checkEqual(tdSql.queryResult[0][4],f'create stream {stream_name} trigger at_once ignore expired 0 into stb1 as select * from tb') tdSql.execute(f'drop database {self.dbname}') def run(self): self.drop_ntb_check() From 51dfa20cf48ab4e83a661faa9d500bd3ab25d109 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 Apr 2024 10:13:15 +0800 Subject: [PATCH 16/16] fix(tsdb): set correct initial value for compare. --- source/dnode/vnode/src/inc/tsdb.h | 2 +- source/dnode/vnode/src/tsdb/tsdbRead2.c | 2 +- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 45 ++++++++++++++++++---- 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 33cce621d5..aa750a291b 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -126,7 +126,7 @@ int32_t tsdbRowCompareWithoutVersion(const void *p1, const void *p2); int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2); void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key); void tColRowGetKey(SBlockData *pBlock, int32_t irow, SRowKey *key); -int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc); + // STSDBRowIter int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 33a26ab3d5..8abdfaac6a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -124,7 +124,7 @@ int32_t pkCompEx(SRowKey* p1, SRowKey* p2) { if (p1->pks[0].val == p2->pks[0].val) { return 0; } else { - return p1->pks[0].val > p2->pks[0].val? 1:-1; + return tValueCompare(&p1->pks[0], &p2->pks[0]); } } } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index c805d2f93f..27b7f3edbe 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -136,20 +136,51 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in if (numOfPks > 0) { pKey->pks[0].type = type; - if (IS_NUMERIC_TYPE(pKey->pks[0].type)) { + + if (IS_NUMERIC_TYPE(type)) { if (asc) { - switch(pKey->pks[0].type) { - case TSDB_DATA_TYPE_BIGINT:pKey->pks[0].val = INT64_MIN;break; - case TSDB_DATA_TYPE_INT:pKey->pks[0].val = INT32_MIN;break; - case TSDB_DATA_TYPE_SMALLINT:pKey->pks[0].val = INT16_MIN;break; - case TSDB_DATA_TYPE_TINYINT:pKey->pks[0].val = INT8_MIN;break; + switch(type) { + case TSDB_DATA_TYPE_BIGINT: { + pKey->pks[0].val = INT64_MIN; + break; + } + case TSDB_DATA_TYPE_INT:{ + int32_t min = INT32_MIN; + memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); + break; + } + case TSDB_DATA_TYPE_SMALLINT:{ + int16_t min = INT16_MIN; + memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); + break; + } + case TSDB_DATA_TYPE_TINYINT:{ + int8_t min = INT8_MIN; + memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); + break; + } + case TSDB_DATA_TYPE_UTINYINT: + case TSDB_DATA_TYPE_USMALLINT: + case TSDB_DATA_TYPE_UINT: + case TSDB_DATA_TYPE_UBIGINT: { + pKey->pks[0].val = 0; + break; + } + default: + ASSERT(0); } } else { - switch(pKey->pks[0].type) { + switch(type) { case TSDB_DATA_TYPE_BIGINT:pKey->pks[0].val = INT64_MAX;break; case TSDB_DATA_TYPE_INT:pKey->pks[0].val = INT32_MAX;break; case TSDB_DATA_TYPE_SMALLINT:pKey->pks[0].val = INT16_MAX;break; case TSDB_DATA_TYPE_TINYINT:pKey->pks[0].val = INT8_MAX;break; + case TSDB_DATA_TYPE_UBIGINT:pKey->pks[0].val = UINT64_MAX;break; + case TSDB_DATA_TYPE_UINT:pKey->pks[0].val = UINT32_MAX;break; + case TSDB_DATA_TYPE_USMALLINT:pKey->pks[0].val = UINT16_MAX;break; + case TSDB_DATA_TYPE_UTINYINT:pKey->pks[0].val = UINT8_MAX;break; + default: + ASSERT(0); } } } else {