From b9927cce1408760372f61b7168816318efaffaf3 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Mon, 29 Jan 2024 16:04:08 +0800 Subject: [PATCH 1/7] fix: count error on tag which is null --- source/common/src/tdatablock.c | 1 + source/libs/executor/src/scanoperator.c | 4 ++-- source/libs/function/src/builtinsimpl.c | 6 ++++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 5382259899..8739068432 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -224,6 +224,7 @@ static int32_t doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t cur int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows, bool trimValue) { + if (currentRow >= numOfRows) return TSDB_CODE_SUCCESS; int32_t len = pColumnInfoData->info.bytes; if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { len = varDataTLen(pData); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9243f385d0..1b69bcce94 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -684,7 +684,7 @@ void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) { static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBase* pBase, SSDataBlock* pBlock, const STableKeyInfo* tbInfo) { blockDataEmpty(pBlock); - pBlock->info.rows = 1; + pBlock->info.rows = 0; pBlock->info.id.uid = tbInfo->uid; pBlock->info.id.groupId = tbInfo->groupId; @@ -696,7 +696,7 @@ static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBas } // set tag/tbname - doSetTagColumnData(pBase, pBlock, pTaskInfo, pBlock->info.rows); + doSetTagColumnData(pBase, pBlock, pTaskInfo, 1); return pBlock; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 000f634fe5..fc1fe29332 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2338,6 +2338,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } + if(pInput->totalRows == 0) return TSDB_CODE_SUCCESS; + SColumnDataAgg* pColAgg = (pInput->colDataSMAIsSet) ? pInput->pColumnDataAgg[0] : NULL; TSKEY startKey = getRowPTs(pInput->pPTS, 0); @@ -2647,7 +2649,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { int32_t bytes = pInputCol->info.bytes; pInfo->bytes = bytes; - if (IS_NULL_TYPE(type)) { + if (IS_NULL_TYPE(type) || 0 == pInput->totalRows) { return TSDB_CODE_SUCCESS; } @@ -6022,7 +6024,7 @@ int32_t groupKeyFunction(SqlFunctionCtx* pCtx) { goto _group_key_over; } - if (colDataIsNull_s(pInputCol, startIndex)) { + if (pInputCol->pData == NULL || colDataIsNull_s(pInputCol, startIndex)) { pInfo->isNull = true; pInfo->hasResult = true; goto _group_key_over; From 49700404593568ede97c1b11bbe899bd327a3dcc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Jan 2024 15:22:49 +0800 Subject: [PATCH 2/7] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 9 +++++---- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/libs/executor/src/executor.c | 2 +- source/libs/stream/src/streamExec.c | 10 ++++++++-- source/libs/stream/src/streamMeta.c | 2 +- source/libs/stream/src/streamStart.c | 2 -- source/libs/stream/src/streamState.c | 1 - 7 files changed, 17 insertions(+), 13 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9b3ce36bdd..747ba34c97 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -324,12 +324,13 @@ typedef struct SStreamStatus { int8_t taskStatus; int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set int8_t schedStatus; - int32_t schedIdleTime; // idle time before invoke again - int64_t lastExecTs; // last exec time stamp int8_t statusBackup; - bool appendTranstateBlock; // has append the transfer state data block already - int32_t timerActive; // timer is active + int32_t schedIdleTime; // idle time before invoke again + int32_t timerActive; // timer is active + int64_t lastExecTs; // last exec time stamp int32_t inScanHistorySentinel; + bool appendTranstateBlock; // has append the transfer state data block already + bool supplementaryWalscan; // complete the supplementary wal scan or not } SStreamStatus; typedef struct SDataRange { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a689932754..1ade1c8c41 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -835,8 +835,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { SCheckpointInfo* pChkInfo = &pTask->chkInfo; // checkpoint ver is the kept version, handled data should be the next version. - if (pTask->chkInfo.checkpointId != 0) { - pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1; + if (pChkInfo->checkpointId != 0) { + pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index e1a8e8ea01..9645fc1b7a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1080,7 +1080,7 @@ bool qStreamScanhistoryFinished(qTaskInfo_t tinfo) { int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow; + STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow; qDebug("%s remove timeWindow filter:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 27748c84a0..0c56679c14 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -390,6 +390,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { pTimeWindow->skey = INT64_MIN; qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); + stDebug("s-task:%s after exceed the threshold:%" PRId64 " and then update the window filter", + pStreamTask->id.idStr, pStreamTask->dataRange.range.maxVer); } else { stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr); } @@ -398,9 +400,11 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); + // 3. scan wal file from the beginning till the end version of fill-history task. + streamTaskSupplementaryScan(pStreamTask); + // 3. send msg to mnode to launch a checkpoint to keep the state for current stream - streamTaskSendCheckpointReq(pStreamTask); -// streamTaskResume(pStreamTask); +// streamTaskSendCheckpointReq(pStreamTask); // 4. assign the status to the value that will be kept in disk pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state; @@ -777,6 +781,8 @@ int32_t streamResumeTask(SStreamTask* pTask) { while (1) { /*int32_t code = */ doStreamExecTask(pTask); + + // check if continue taosThreadMutexLock(&pTask->lock); int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8ad20f357c..5e53a921b9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1322,7 +1322,7 @@ void streamMetaRUnLock(SStreamMeta* pMeta) { if (code != TSDB_CODE_SUCCESS) { stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code); } else { - stDebug("vgId:%d meta-runlock completed", pMeta->vgId); +// stTrace("vgId:%d meta-runlock completed", pMeta->vgId); } } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 20fdcff7d9..2f5bca8ed9 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -385,7 +385,6 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { void doProcessDownstreamReadyRsp(SStreamTask* pTask) { EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST; - streamTaskOnHandleEventSuccess(pTask->status.pSM, event); int64_t initTs = pTask->execInfo.init; @@ -989,4 +988,3 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { streamSetParamForStreamScannerStep2(pTask, &verRange, &win); } } - diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 19b7359981..84b0a777f2 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -670,7 +670,6 @@ void streamStateFreeCur(SStreamStateCur* pCur) { if (!pCur) { return; } - qDebug("streamStateFreeCur"); streamStateResetCur(pCur); taosMemoryFree(pCur); } From 1d110953c9bc091f2181b9def8b7f6018161c68f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Jan 2024 15:23:54 +0800 Subject: [PATCH 3/7] refactor: do some internal refactor. --- source/libs/stream/src/streamExec.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 0c56679c14..b0170d5083 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -400,11 +400,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); - // 3. scan wal file from the beginning till the end version of fill-history task. - streamTaskSupplementaryScan(pStreamTask); - // 3. send msg to mnode to launch a checkpoint to keep the state for current stream -// streamTaskSendCheckpointReq(pStreamTask); + streamTaskSendCheckpointReq(pStreamTask); // 4. assign the status to the value that will be kept in disk pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state; From efbc7e14024aa09f5ffd5d52f079596393d3cb23 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Jan 2024 15:35:01 +0800 Subject: [PATCH 4/7] fix(stream): fix memory leak. --- source/dnode/mnode/impl/src/mndStreamHb.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 19c4f22280..ff0d8b82fc 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -183,6 +183,7 @@ static int32_t mndDropOrphanTasks(SMnode* pMnode, SArray* pList) { return -1; } + mndTransDrop(pTrans); return 0; } From 5d731dc9a4d039acf72baeaa1833be4742945e3d Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 30 Jan 2024 17:34:06 +0800 Subject: [PATCH 5/7] fix: use blank data flag --- include/common/tcommon.h | 1 + include/libs/function/function.h | 1 + source/common/src/tdatablock.c | 1 - source/libs/executor/src/executorInt.c | 2 ++ source/libs/executor/src/scanoperator.c | 3 ++- source/libs/function/src/builtinsimpl.c | 7 ++++--- 6 files changed, 10 insertions(+), 5 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 24e5d186b9..d4537ddc89 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -206,6 +206,7 @@ typedef struct SDataBlockInfo { int16_t hasVarCol; int16_t dataLoad; // denote if the data is loaded or not uint8_t scanFlag; + bool blankFill; // TODO: optimize and remove following int64_t version; // used for stream, and need serialization diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 8863201094..0fa84c99c6 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -114,6 +114,7 @@ typedef struct SInputColumnInfoData { int32_t totalRows; // total rows in current columnar data int32_t startRowIndex; // handle started row index int64_t numOfRows; // the number of rows needs to be handled + bool blankFill; // fill blank data to block for empty table int32_t numOfInputCols; // PTS is not included bool colDataSMAIsSet; // if agg is set or not SColumnInfoData *pPTS; // primary timestamp column diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 8739068432..5382259899 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -224,7 +224,6 @@ static int32_t doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t cur int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows, bool trimValue) { - if (currentRow >= numOfRows) return TSDB_CODE_SUCCESS; int32_t len = pColumnInfoData->info.bytes; if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { len = varDataTLen(pData); diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index ff4d3d0d27..76dc622cfd 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -311,6 +311,7 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int pInput->totalRows = pBlock->info.rows; pInput->numOfRows = pBlock->info.rows; pInput->startRowIndex = 0; + pInput->blankFill = pBlock->info.blankFill; // NOTE: the last parameter is the primary timestamp column // todo: refactor this @@ -325,6 +326,7 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int pInput->totalRows = pBlock->info.rows; pInput->numOfRows = pBlock->info.rows; pInput->startRowIndex = 0; + pInput->blankFill = pBlock->info.blankFill; code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1b69bcce94..1299638935 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -684,9 +684,10 @@ void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) { static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBase* pBase, SSDataBlock* pBlock, const STableKeyInfo* tbInfo) { blockDataEmpty(pBlock); - pBlock->info.rows = 0; + pBlock->info.rows = 1; pBlock->info.id.uid = tbInfo->uid; pBlock->info.id.groupId = tbInfo->groupId; + pBlock->info.blankFill = true; // only one row: set all col data to null & hasNull int32_t col_num = blockDataGetNumOfCols(pBlock); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fc1fe29332..5ab6d5e075 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -499,6 +499,9 @@ static int64_t getNumOfElems(SqlFunctionCtx* pCtx) { */ SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; + if(1 == pInput->numOfRows && pInput->blankFill) { + return 0; + } if (pInput->colDataSMAIsSet && pInput->totalRows == pInput->numOfRows) { numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull; } else { @@ -2338,8 +2341,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } - if(pInput->totalRows == 0) return TSDB_CODE_SUCCESS; - SColumnDataAgg* pColAgg = (pInput->colDataSMAIsSet) ? pInput->pColumnDataAgg[0] : NULL; TSKEY startKey = getRowPTs(pInput->pPTS, 0); @@ -2649,7 +2650,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { int32_t bytes = pInputCol->info.bytes; pInfo->bytes = bytes; - if (IS_NULL_TYPE(type) || 0 == pInput->totalRows) { + if (IS_NULL_TYPE(type)) { return TSDB_CODE_SUCCESS; } From 917a6eb5a7861e97ae4ce5d2ed8b71781a0c9a38 Mon Sep 17 00:00:00 2001 From: dmchen Date: Tue, 30 Jan 2024 09:47:20 +0000 Subject: [PATCH 6/7] fix/TD-28503 --- source/dnode/mnode/impl/src/mndCompact.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndCompact.c b/source/dnode/mnode/impl/src/mndCompact.c index 101022a44f..4e71684372 100644 --- a/source/dnode/mnode/impl/src/mndCompact.c +++ b/source/dnode/mnode/impl/src/mndCompact.c @@ -599,7 +599,8 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished, pDetail->newNumberFileset, pDetail->newFinished); - if(pDetail->numberFileset < pDetail->newNumberFileset || pDetail->finished < pDetail->newFinished) + //these 2 number will jump back after dnode restart, so < is not used here + if(pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished) needSave = true; } From 9c0d2e603a9e649c0a6699202d8990c844f75078 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 30 Jan 2024 20:24:59 +0800 Subject: [PATCH 7/7] reset blankFill --- source/libs/executor/src/scanoperator.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1299638935..bbe12c345e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -663,6 +663,8 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1); + pInfo->pResBlock->info.blankFill = false; + if (!pInfo->needCountEmptyTable) { pInfo->countState = TABLE_COUNT_STATE_END; } else {