From 7ee05df7c6d9f66efbc1e254213f698414f69b37 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 8 Jun 2023 17:46:25 +0800 Subject: [PATCH] fix lastBlock remain ts not saved --- source/libs/executor/src/timesliceoperator.c | 39 +++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 65e3e75bce..1aa6027a1d 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -45,6 +45,7 @@ typedef struct STimeSliceOperatorInfo { SGroupKeys* pPrevGroupKey; SSDataBlock* pNextGroupRes; SSDataBlock* pRemainRes; // save block unfinished processing + int64_t remainTs; // the remaining timestamp in the block to be processed } STimeSliceOperatorInfo; static void destroyTimeSliceOperatorInfo(void* param); @@ -642,11 +643,9 @@ static int32_t resetKeeperInfo(STimeSliceOperatorInfo* pInfo) { return TSDB_CODE_SUCCESS; } -static bool checkThresholdReached(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, int32_t threshold) { +static bool checkThresholdReached(STimeSliceOperatorInfo* pSliceInfo, int32_t threshold) { SSDataBlock* pResBlock = pSliceInfo->pRes; - if (pResBlock->info.rows > threshold) { - pSliceInfo->pRemainRes = pBlock; return true; } @@ -661,6 +660,16 @@ static bool checkWindowBoundReached(STimeSliceOperatorInfo* pSliceInfo) { return false; } +static void saveBlockStatus(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, int32_t curIndex) { + SSDataBlock* pResBlock = pSliceInfo->pRes; + + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); + if (curIndex < pBlock->info.rows - 1) { + pSliceInfo->pRemainRes = pBlock; + pSliceInfo->remainTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1); + } +} + static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, bool ignoreNull) { SSDataBlock* pResBlock = pSliceInfo->pRes; @@ -670,6 +679,12 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS for (int32_t i = 0; i < pBlock->info.rows; ++i) { int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); + // check if need to resume from the position of last unfinished block + if (pSliceInfo->pRemainRes != NULL && ts < pSliceInfo->remainTs && + pSliceInfo->current <= pSliceInfo->remainTs) { + continue; + } + // check for duplicate timestamps if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP); @@ -681,7 +696,8 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS if (checkWindowBoundReached(pSliceInfo)) { break; } - if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) { + if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { + saveBlockStatus(pSliceInfo, pBlock, i); return; } @@ -697,7 +713,8 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS if (checkWindowBoundReached(pSliceInfo)) { break; } - if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) { + if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { + saveBlockStatus(pSliceInfo, pBlock, i); return; } } else if (ts < pSliceInfo->current) { @@ -723,7 +740,8 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS if (checkWindowBoundReached(pSliceInfo)) { break; } - if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) { + if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { + saveBlockStatus(pSliceInfo, pBlock, i); return; } } else { @@ -759,7 +777,8 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS if (checkWindowBoundReached(pSliceInfo)) { break; } - if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) { + if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { + saveBlockStatus(pSliceInfo, pBlock, i); return; } } @@ -854,7 +873,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { while (1) { if (pSliceInfo->pNextGroupRes != NULL) { doHandleTimeslice(pOperator, pSliceInfo->pNextGroupRes); - if (checkThresholdReached(pSliceInfo, pSliceInfo->pRemainRes, pOperator->resultInfo.threshold)) { + if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); goto _finished; } @@ -879,7 +898,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } doHandleTimeslice(pOperator, pBlock); - if (checkThresholdReached(pSliceInfo, pSliceInfo->pRemainRes, pOperator->resultInfo.threshold)) { + if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); goto _finished; } @@ -943,6 +962,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries); pInfo->fillType = convertFillType(pInterpPhyNode->fillMode); initResultSizeInfo(&pOperator->resultInfo, 4096); + pOperator->resultInfo.threshold = 1; pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues); pInfo->pLinearInfo = NULL; @@ -956,6 +976,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->pPrevGroupKey = NULL; pInfo->pNextGroupRes = NULL; pInfo->pRemainRes = NULL; + pInfo->remainTs = 0; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;