From 0ec4bbdfbc1959207f8192795c385e2668c3646a Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 26 Apr 2023 09:48:13 +0800 Subject: [PATCH 01/22] enable interp used with super table --- source/libs/parser/src/parTranslater.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 25e92a55ec..b35962e6b8 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1518,9 +1518,7 @@ static int32_t translateInterpFunc(STranslateContext* pCxt, SFunctionNode* pFunc SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt; SNode* pTable = pSelect->pFromTable; - if ((NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) || - (TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType && - TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType)))) { + if ((NULL != pTable && QUERY_NODE_REAL_TABLE != nodeType(pTable))) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE, "%s is only supported in single table query", pFunc->functionName); } From d4c783b5901db4d509ad63825e0767ce81c62918 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 26 Apr 2023 09:48:43 +0800 Subject: [PATCH 02/22] check duplicate timestamps --- source/libs/executor/src/timesliceoperator.c | 31 ++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index f0e25d8cc5..4d519f82f1 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -36,6 +36,8 @@ typedef struct STimeSliceOperatorInfo { SColumn tsCol; // primary timestamp column SExprSupp scalarSup; // scalar calculation struct SFillColInfo* pFillColInfo; // fill column info + int64_t prevTs; + bool prevTsSet; } STimeSliceOperatorInfo; static void destroyTimeSliceOperatorInfo(void* param); @@ -166,6 +168,28 @@ static bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) { return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0); } +static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumnInfoData* pTsCol, + int32_t curIndex, int32_t rows) { + + + int64_t currentTs = *(int64_t*)colDataGetData(pTsCol, curIndex); + if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevTs)) { + return true; + } + + pSliceInfo->prevTsSet = true; + pSliceInfo->prevTs = currentTs; + + if (curIndex < rows - 1) { + int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1); + if (currentTs == nextTs) { + return true; + } + } + + return false; +} + static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, bool beforeTs) { int32_t rows = pResBlock->info.rows; @@ -472,6 +496,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { for (int32_t i = 0; i < pBlock->info.rows; ++i) { int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); + // check for duplicate timestamps + if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP); + } + if (pSliceInfo->current > pSliceInfo->win.ekey) { setOperatorCompleted(pOperator); break; @@ -612,6 +641,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->win = pInterpPhyNode->timeRange; pInfo->interval.interval = pInterpPhyNode->interval; pInfo->current = pInfo->win.skey; + pInfo->prevTsSet = false; + pInfo->prevTs = 0; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; From c56bb64f1c7a06ca79bfb4ea4f2972ea1baac81b Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 26 Apr 2023 11:04:13 +0800 Subject: [PATCH 03/22] refactor timeslice operator --- source/libs/executor/src/timesliceoperator.c | 185 ++++++++++--------- 1 file changed, 98 insertions(+), 87 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 4d519f82f1..e1782107f4 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -38,6 +38,8 @@ typedef struct STimeSliceOperatorInfo { struct SFillColInfo* pFillColInfo; // fill column info int64_t prevTs; bool prevTsSet; + uint64_t groupId; + SSDataBlock* pNextGroupRes; } STimeSliceOperatorInfo; static void destroyTimeSliceOperatorInfo(void* param); @@ -456,6 +458,99 @@ static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock return TSDB_CODE_SUCCESS; } +static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, + SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResBlock = pSliceInfo->pRes; + SInterval* pInterval = &pSliceInfo->interval; + + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); + + // check for duplicate timestamps + if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP); + } + + if (pSliceInfo->current > pSliceInfo->win.ekey) { + setOperatorCompleted(pOperator); + break; + } + + if (ts == pSliceInfo->current) { + addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); + + doKeepPrevRows(pSliceInfo, pBlock, i); + doKeepLinearInfo(pSliceInfo, pBlock, i); + + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + if (pSliceInfo->current > pSliceInfo->win.ekey) { + setOperatorCompleted(pOperator); + break; + } + } else if (ts < pSliceInfo->current) { + // in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate + doKeepPrevRows(pSliceInfo, pBlock, i); + doKeepLinearInfo(pSliceInfo, pBlock, i); + + if (i < pBlock->info.rows - 1) { + // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate + doKeepNextRows(pSliceInfo, pBlock, i + 1); + int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); + if (nextTs > pSliceInfo->current) { + while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { + if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false) && + pSliceInfo->fillType == TSDB_FILL_LINEAR) { + break; + } else { + pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, + pInterval->precision); + } + } + + if (pSliceInfo->current > pSliceInfo->win.ekey) { + setOperatorCompleted(pOperator); + break; + } + } else { + // ignore current row, and do nothing + } + } else { // it is the last row of current block + doKeepPrevRows(pSliceInfo, pBlock, i); + } + } else { // ts > pSliceInfo->current + // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate + doKeepNextRows(pSliceInfo, pBlock, i); + doKeepLinearInfo(pSliceInfo, pBlock, i); + + while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { + if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, true) && + pSliceInfo->fillType == TSDB_FILL_LINEAR) { + break; + } else { + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + } + } + + // add current row if timestamp match + if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) { + addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); + + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + } + doKeepPrevRows(pSliceInfo, pBlock, i); + + if (pSliceInfo->current > pSliceInfo->win.ekey) { + setOperatorCompleted(pOperator); + break; + } + } + } +} + static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -491,93 +586,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); - - SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); - - // check for duplicate timestamps - if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP); - } - - if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); - break; - } - - if (ts == pSliceInfo->current) { - addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); - - doKeepPrevRows(pSliceInfo, pBlock, i); - doKeepLinearInfo(pSliceInfo, pBlock, i); - - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); - break; - } - } else if (ts < pSliceInfo->current) { - // in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate - doKeepPrevRows(pSliceInfo, pBlock, i); - doKeepLinearInfo(pSliceInfo, pBlock, i); - - if (i < pBlock->info.rows - 1) { - // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate - doKeepNextRows(pSliceInfo, pBlock, i + 1); - int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); - if (nextTs > pSliceInfo->current) { - while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { - if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false) && - pSliceInfo->fillType == TSDB_FILL_LINEAR) { - break; - } else { - pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, - pInterval->precision); - } - } - - if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); - break; - } - } else { - // ignore current row, and do nothing - } - } else { // it is the last row of current block - doKeepPrevRows(pSliceInfo, pBlock, i); - } - } else { // ts > pSliceInfo->current - // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate - doKeepNextRows(pSliceInfo, pBlock, i); - doKeepLinearInfo(pSliceInfo, pBlock, i); - - while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { - if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, true) && - pSliceInfo->fillType == TSDB_FILL_LINEAR) { - break; - } else { - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - } - } - - // add current row if timestamp match - if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) { - addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); - - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - } - doKeepPrevRows(pSliceInfo, pBlock, i); - - if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); - break; - } - } - } + doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo); } // check if need to interpolate after last datablock @@ -643,6 +652,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->current = pInfo->win.skey; pInfo->prevTsSet = false; pInfo->prevTs = 0; + pInfo->groupId = 0; + pInfo->pNextGroupRes = NULL; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; From fc6ca69afad969d883e707ea1315851e6d64d692 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 26 Apr 2023 11:04:13 +0800 Subject: [PATCH 04/22] refactor timeslice operator --- source/libs/executor/src/timesliceoperator.c | 65 +++++++++++++------- 1 file changed, 44 insertions(+), 21 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index e1782107f4..7024b092b0 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -551,6 +551,11 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS } } +static void revertTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) { + pSliceInfo->current = pSliceInfo->win.skey; + pSliceInfo->prevTsSet = false; +} + static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -569,34 +574,52 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { blockDataCleanup(pResBlock); while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - if (pBlock == NULL) { - break; + if (pSliceInfo->pNextGroupRes != NULL) { + doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo); + pSliceInfo->pNextGroupRes = NULL; } - if (pSliceInfo->scalarSup.pExprInfo != NULL) { - SExprSupp* pExprSup = &pSliceInfo->scalarSup; - projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + while (1) { + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + if (pBlock == NULL) { + break; + } + + if (pSliceInfo->groupId == 0 && pBlock->info.id.groupId != 0) { + pSliceInfo->groupId = pBlock->info.id.groupId; + } else { + if (pSliceInfo->groupId != pBlock->info.id.groupId) { + pSliceInfo->groupId = pBlock->info.id.groupId; + pSliceInfo->pNextGroupRes = pBlock; + break; + } + } + + if (pSliceInfo->scalarSup.pExprInfo != NULL) { + SExprSupp* pExprSup = &pSliceInfo->scalarSup; + projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + } + + int32_t code = initKeeperInfo(pSliceInfo, pBlock); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } + + // the pDataBlock are always the same one, no need to call this again + setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); + doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo); } - int32_t code = initKeeperInfo(pSliceInfo, pBlock); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); + // check if need to interpolate after last datablock + // except for fill(next), fill(linear) + while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && + pSliceInfo->fillType != TSDB_FILL_LINEAR) { + genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false); + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); } - - // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); - doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo); } - // check if need to interpolate after last datablock - // except for fill(next), fill(linear) - while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && - pSliceInfo->fillType != TSDB_FILL_LINEAR) { - genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false); - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - } doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); From 75f76a62ab49d0bd250eb0464c23c20725f9cc80 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 26 Apr 2023 18:07:19 +0800 Subject: [PATCH 05/22] fix process next group bug --- source/libs/executor/src/timesliceoperator.c | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 7024b092b0..08121260d1 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -473,7 +473,6 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS } if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); break; } @@ -486,7 +485,6 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); break; } } else if (ts < pSliceInfo->current) { @@ -510,7 +508,6 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS } if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); break; } } else { @@ -544,14 +541,13 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS doKeepPrevRows(pSliceInfo, pBlock, i); if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); break; } } } } -static void revertTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) { +static void restoreTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) { pSliceInfo->current = pSliceInfo->win.skey; pSliceInfo->prevTsSet = false; } @@ -582,6 +578,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { + setOperatorCompleted(pOperator); break; } @@ -591,7 +588,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pSliceInfo->groupId != pBlock->info.id.groupId) { pSliceInfo->groupId = pBlock->info.id.groupId; pSliceInfo->pNextGroupRes = pBlock; - break; + restoreTimesliceInfo(pSliceInfo); + goto _group_over; } } @@ -618,11 +616,14 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); } + +_group_over: + doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); + if (pOperator->status == OP_EXEC_DONE) { + break; + } } - - doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); - // restore the value setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); if (pResBlock->info.rows == 0) { From c64788ad71dd5a50f0330072044de4a8a1932923 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 26 Apr 2023 18:37:23 +0800 Subject: [PATCH 06/22] fix group restore --- source/libs/executor/src/timesliceoperator.c | 26 +++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 08121260d1..a192e57e8a 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -547,6 +547,18 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS } } +static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator) { + SSDataBlock* pResBlock = pSliceInfo->pRes; + SInterval* pInterval = &pSliceInfo->interval; + + while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && + pSliceInfo->fillType != TSDB_FILL_LINEAR) { + genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false); + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + } +} + static void restoreTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) { pSliceInfo->current = pSliceInfo->win.skey; pSliceInfo->prevTsSet = false; @@ -588,8 +600,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pSliceInfo->groupId != pBlock->info.id.groupId) { pSliceInfo->groupId = pBlock->info.id.groupId; pSliceInfo->pNextGroupRes = pBlock; - restoreTimesliceInfo(pSliceInfo); - goto _group_over; + break; } } @@ -610,18 +621,15 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // check if need to interpolate after last datablock // except for fill(next), fill(linear) - while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && - pSliceInfo->fillType != TSDB_FILL_LINEAR) { - genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false); - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - } + genInterpAfterDataBlock(pSliceInfo, pOperator); -_group_over: doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); if (pOperator->status == OP_EXEC_DONE) { break; } + + // restore initial value for next group + restoreTimesliceInfo(pSliceInfo); } // restore the value From 54d82fce8ae09c973d9c0eb65f053bc891742eb4 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 27 Apr 2023 11:28:20 +0800 Subject: [PATCH 07/22] add reset when switching groups --- source/libs/executor/src/timesliceoperator.c | 59 +++++++++++++++++++- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index a192e57e8a..c48b0735fb 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -458,6 +458,60 @@ static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock return TSDB_CODE_SUCCESS; } +static int32_t resetPrevRowsKeeper(STimeSliceOperatorInfo* pInfo) { + if (pInfo->pPrevRow == NULL) { + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) { + SGroupKeys *pKey = taosArrayGet(pInfo->pPrevRow, i); + pKey->isNull = false; + } + + pInfo->isPrevRowSet = false; + + return TSDB_CODE_SUCCESS; +} + +static int32_t resetNextRowsKeeper(STimeSliceOperatorInfo* pInfo) { + if (pInfo->pNextRow == NULL) { + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) { + SGroupKeys *pKey = taosArrayGet(pInfo->pPrevRow, i); + pKey->isNull = false; + } + + pInfo->isNextRowSet = false; + + return TSDB_CODE_SUCCESS; +} + +static int32_t resetFillLinearInfo(STimeSliceOperatorInfo* pInfo) { + if (pInfo->pLinearInfo == NULL) { + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) { + SFillLinearInfo *pLinearInfo = taosArrayGet(pInfo->pLinearInfo, i); + pLinearInfo->start.key = INT64_MIN; + pLinearInfo->end.key = INT64_MIN; + pLinearInfo->isStartSet = false; + pLinearInfo->isEndSet = false; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t resetKeeperInfo(STimeSliceOperatorInfo* pInfo) { + resetPrevRowsKeeper(pInfo); + resetNextRowsKeeper(pInfo); + resetFillLinearInfo(pInfo); + + return TSDB_CODE_SUCCESS; +} + static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SSDataBlock* pResBlock = pSliceInfo->pRes; @@ -559,9 +613,10 @@ static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperato } } -static void restoreTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) { +static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) { pSliceInfo->current = pSliceInfo->win.skey; pSliceInfo->prevTsSet = false; + resetKeeperInfo(pSliceInfo); } static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { @@ -629,7 +684,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } // restore initial value for next group - restoreTimesliceInfo(pSliceInfo); + resetTimesliceInfo(pSliceInfo); } // restore the value From 76eea9f7c49cd6db80f171c5e9521d0780d18528 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 28 Apr 2023 14:59:35 +0800 Subject: [PATCH 08/22] fix group_key check --- source/libs/executor/src/timesliceoperator.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index c48b0735fb..7b0da0608a 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -192,6 +192,11 @@ static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumn return false; } +static bool isInterpFunc(SExprInfo* pExprInfo) { + char *name = pExprInfo->pExpr->_function.functionName; + return (strcasecmp(name, "interp") == 0); +} + static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, bool beforeTs) { int32_t rows = pResBlock->info.rows; @@ -213,6 +218,8 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp bool isFilled = true; colDataAppend(pDst, pResBlock->info.rows, (char*)&isFilled, false); continue; + } else if (!isInterpFunc(pExprInfo)) { + continue; } int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; From 42df9f587f1e2f8eae33230e0f648b4e48716f49 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 28 Apr 2023 16:41:36 +0800 Subject: [PATCH 09/22] fix group_key not set properly --- source/libs/executor/src/timesliceoperator.c | 41 ++++++++++++++++---- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 7b0da0608a..310c3c5674 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -39,6 +39,7 @@ typedef struct STimeSliceOperatorInfo { int64_t prevTs; bool prevTsSet; uint64_t groupId; + SSDataBlock* pCurrentGroupRes; SSDataBlock* pNextGroupRes; } STimeSliceOperatorInfo; @@ -197,18 +198,24 @@ static bool isInterpFunc(SExprInfo* pExprInfo) { return (strcasecmp(name, "interp") == 0); } +static bool isGroupKeyFunc(SExprInfo* pExprInfo) { + char *name = pExprInfo->pExpr->_function.functionName; + return (strcasecmp(name, "_group_key") == 0); +} + static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, - bool beforeTs) { + SSDataBlock* pSrcBlock, int32_t index, bool beforeTs) { int32_t rows = pResBlock->info.rows; timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock); // todo set the correct primary timestamp column + // output the result bool hasInterp = true; for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; - int32_t dstSlot = pExprInfo->base.resSchema.slotId; + int32_t dstSlot = pExprInfo->base.resSchema.slotId; SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); if (isIrowtsPseudoColumn(pExprInfo)) { @@ -219,6 +226,18 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp colDataAppend(pDst, pResBlock->info.rows, (char*)&isFilled, false); continue; } else if (!isInterpFunc(pExprInfo)) { + if (isGroupKeyFunc(pExprInfo)) { + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); + + if (colDataIsNull_s(pSrc, index)) { + colDataSetNULL(pDst, pResBlock->info.rows); + continue; + } + + char* v = colDataGetData(pSrc, index); + colDataSetVal(pDst, pResBlock->info.rows, v, false); + } continue; } @@ -345,7 +364,7 @@ static void addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* bool isFilled = false; colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false); } else { - int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); if (colDataIsNull_s(pSrc, index)) { @@ -559,7 +578,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); if (nextTs > pSliceInfo->current) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { - if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false) && + if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false) && pSliceInfo->fillType == TSDB_FILL_LINEAR) { break; } else { @@ -583,7 +602,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS doKeepLinearInfo(pSliceInfo, pBlock, i); while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { - if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, true) && + if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true) && pSliceInfo->fillType == TSDB_FILL_LINEAR) { break; } else { @@ -608,13 +627,14 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS } } -static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator) { +static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, SSDataBlock* pSrcBlock, + int32_t index) { SSDataBlock* pResBlock = pSliceInfo->pRes; SInterval* pInterval = &pSliceInfo->interval; while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && pSliceInfo->fillType != TSDB_FILL_LINEAR) { - genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false); + genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pSrcBlock, index, false); pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); } @@ -645,7 +665,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { while (1) { if (pSliceInfo->pNextGroupRes != NULL) { + setInputDataBlock(pSup, pSliceInfo->pNextGroupRes, order, MAIN_SCAN, true); doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo); + pSliceInfo->pCurrentGroupRes = pSliceInfo->pNextGroupRes; pSliceInfo->pNextGroupRes = NULL; } @@ -679,11 +701,13 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo); + pSliceInfo->pCurrentGroupRes = pBlock; } // check if need to interpolate after last datablock // except for fill(next), fill(linear) - genInterpAfterDataBlock(pSliceInfo, pOperator); + genInterpAfterDataBlock(pSliceInfo, pOperator, pSliceInfo->pCurrentGroupRes, + pSliceInfo->pCurrentGroupRes->info.rows - 1); doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); if (pOperator->status == OP_EXEC_DONE) { @@ -748,6 +772,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->prevTs = 0; pInfo->groupId = 0; pInfo->pNextGroupRes = NULL; + pInfo->pCurrentGroupRes = NULL; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; From 7b01cad063afcfc6ab037a375ba24dad2b5efe97 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 4 May 2023 18:58:07 +0800 Subject: [PATCH 10/22] add test cases --- tests/system-test/2-query/interp.py | 188 +++++++++++++++++++++++++--- 1 file changed, 172 insertions(+), 16 deletions(-) diff --git a/tests/system-test/2-query/interp.py b/tests/system-test/2-query/interp.py index ddf3f2534d..166fd6b686 100644 --- a/tests/system-test/2-query/interp.py +++ b/tests/system-test/2-query/interp.py @@ -23,6 +23,7 @@ class TDTestCase: stbname = "stb" ctbname1 = "ctb1" ctbname2 = "ctb2" + ctbname3 = "ctb3" tdSql.prepare() @@ -816,17 +817,26 @@ class TDTestCase: ) tdSql.execute( - f'''create table if not exists {dbname}.{ctbname2} using {dbname}.{stbname} tags(1) + f'''create table if not exists {dbname}.{ctbname2} using {dbname}.{stbname} tags(2) ''' ) - tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:05', 5, 5, 5, 5, 5.0, 5.0, true, 'varchar', 'nchar')") - tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:10', 10, 10, 10, 10, 10.0, 10.0, true, 'varchar', 'nchar')") - tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')") + tdSql.execute( + f'''create table if not exists {dbname}.{ctbname3} using {dbname}.{stbname} tags(3) + ''' + ) - tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-02 00:00:05', 5, 5, 5, 5, 5.0, 5.0, true, 'varchar', 'nchar')") - tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-02 00:00:10', 10, 10, 10, 10, 10.0, 10.0, true, 'varchar', 'nchar')") - tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-02 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')") + tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:01', 1, 1, 1, 1, 1.0, 1.0, true, 'varchar', 'nchar')") + tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:07', 7, 7, 7, 7, 7.0, 7.0, true, 'varchar', 'nchar')") + tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:13', 13, 13, 13, 13, 13.0, 13.0, true, 'varchar', 'nchar')") + + tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-01 00:00:03', 3, 3, 3, 3, 3.0, 3.0, true, 'varchar', 'nchar')") + tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-01 00:00:09', 9, 9, 9, 9, 9.0, 9.0, true, 'varchar', 'nchar')") + tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-01 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')") + + tdSql.execute(f"insert into {dbname}.{ctbname3} values ('2020-02-01 00:00:05', 5, 5, 5, 5, 5.0, 5.0, true, 'varchar', 'nchar')") + tdSql.execute(f"insert into {dbname}.{ctbname3} values ('2020-02-01 00:00:11', 11, 11, 11, 11, 11.0, 11.0, true, 'varchar', 'nchar')") + tdSql.execute(f"insert into {dbname}.{ctbname3} values ('2020-02-01 00:00:17', 17, 17, 17, 17, 17.0, 17.0, true, 'varchar', 'nchar')") tdSql.execute(f"flush database {dbname}"); @@ -834,7 +844,7 @@ class TDTestCase: # test fill null ## | {. | | .} | - tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:05') every(1d) fill(null)") + tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:06') every(1d) fill(null)") tdSql.checkRows(11) tdSql.checkData(0, 0, 5) tdSql.checkData(1, 0, None) @@ -881,7 +891,7 @@ class TDTestCase: # test fill value ## | {. | | .} | - tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:05') every(1d) fill(value, 1)") + tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:06') every(1d) fill(value, 1)") tdSql.checkRows(11) tdSql.checkData(0, 0, 5) tdSql.checkData(1, 0, 1) @@ -895,7 +905,7 @@ class TDTestCase: tdSql.checkData(9, 0, 1) tdSql.checkData(10, 0, 15) - ## | . | {} | . | + # | . | {} | . | tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-03 00:00:05', '2020-02-07 00:00:05') every(1d) fill(value, 1)") tdSql.checkRows(5) tdSql.checkData(0, 0, 1) @@ -928,7 +938,7 @@ class TDTestCase: # test fill prev ## | {. | | .} | - tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:05') every(1d) fill(prev)") + tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:06') every(1d) fill(prev)") tdSql.checkRows(11) tdSql.checkData(0, 0, 5) tdSql.checkData(1, 0, 5) @@ -973,7 +983,7 @@ class TDTestCase: # test fill next ## | {. | | .} | - tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:05') every(1d) fill(next)") + tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:06') every(1d) fill(next)") tdSql.checkRows(11) tdSql.checkData(0, 0, 5) tdSql.checkData(1, 0, 15) @@ -1015,7 +1025,7 @@ class TDTestCase: # test fill linear ## | {. | | .} | - tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:05') every(1d) fill(linear)") + tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:06') every(1d) fill(linear)") tdSql.checkRows(11) tdSql.checkData(0, 0, 5) tdSql.checkData(1, 0, 6) @@ -2393,13 +2403,159 @@ class TDTestCase: tdLog.printNoPrefix("==========step13:stable cases") - tdSql.error(f"select interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(null)") - #tdSql.checkRows(13) + # select interp from supertable + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)") + tdSql.checkRows(19) + + tdSql.checkData(0, 2, None) + tdSql.checkData(1, 2, 1) + tdSql.checkData(2, 2, None) + tdSql.checkData(3, 2, 3) + tdSql.checkData(4, 2, None) + tdSql.checkData(5, 2, 5) + tdSql.checkData(6, 2, None) + tdSql.checkData(7, 2, 7) + tdSql.checkData(8, 2, None) + tdSql.checkData(9, 2, 9) + tdSql.checkData(10, 2, None) + tdSql.checkData(11, 2, 11) + tdSql.checkData(12, 2, None) + tdSql.checkData(13, 2, 13) + tdSql.checkData(14, 2, None) + tdSql.checkData(15, 2, 15) + tdSql.checkData(16, 2, None) + tdSql.checkData(17, 2, 17) + tdSql.checkData(18, 2, None) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(value, 0)") + tdSql.checkRows(19) + + tdSql.checkData(0, 2, 0) + tdSql.checkData(1, 2, 1) + tdSql.checkData(2, 2, 0) + tdSql.checkData(3, 2, 3) + tdSql.checkData(4, 2, 0) + tdSql.checkData(5, 2, 5) + tdSql.checkData(6, 2, 0) + tdSql.checkData(7, 2, 7) + tdSql.checkData(8, 2, 0) + tdSql.checkData(9, 2, 9) + tdSql.checkData(10, 2, 0) + tdSql.checkData(11, 2, 11) + tdSql.checkData(12, 2, 0) + tdSql.checkData(13, 2, 13) + tdSql.checkData(14, 2, 0) + tdSql.checkData(15, 2, 15) + tdSql.checkData(16, 2, 0) + tdSql.checkData(17, 2, 17) + tdSql.checkData(18, 2, 0) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(prev)") + tdSql.checkRows(18) + + tdSql.checkData(0, 0, '2020-02-01 00:00:01.000') + tdSql.checkData(0, 1, False) + + tdSql.checkData(0, 2, 1) + tdSql.checkData(1, 2, 1) + tdSql.checkData(2, 2, 3) + tdSql.checkData(3, 2, 3) + tdSql.checkData(4, 2, 5) + tdSql.checkData(5, 2, 5) + tdSql.checkData(6, 2, 7) + tdSql.checkData(7, 2, 7) + tdSql.checkData(8, 2, 9) + tdSql.checkData(9, 2, 9) + tdSql.checkData(10, 2, 11) + tdSql.checkData(11, 2, 11) + tdSql.checkData(12, 2, 13) + tdSql.checkData(13, 2, 13) + tdSql.checkData(14, 2, 15) + tdSql.checkData(15, 2, 15) + tdSql.checkData(16, 2, 17) + tdSql.checkData(17, 2, 17) + + tdSql.checkData(17, 0, '2020-02-01 00:00:18.000') + tdSql.checkData(17, 1, True) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)") + tdSql.checkRows(18) + + tdSql.checkData(0, 0, '2020-02-01 00:00:00.000') + tdSql.checkData(0, 1, True) + + tdSql.checkData(0, 2, 1) + tdSql.checkData(1, 2, 1) + tdSql.checkData(2, 2, 3) + tdSql.checkData(3, 2, 3) + tdSql.checkData(4, 2, 5) + tdSql.checkData(5, 2, 5) + tdSql.checkData(6, 2, 7) + tdSql.checkData(7, 2, 7) + tdSql.checkData(8, 2, 9) + tdSql.checkData(9, 2, 9) + tdSql.checkData(10, 2, 11) + tdSql.checkData(11, 2, 11) + tdSql.checkData(12, 2, 13) + tdSql.checkData(13, 2, 13) + tdSql.checkData(14, 2, 15) + tdSql.checkData(15, 2, 15) + tdSql.checkData(16, 2, 17) + tdSql.checkData(17, 2, 17) + + tdSql.checkData(17, 0, '2020-02-01 00:00:17.000') + tdSql.checkData(17, 1, False) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") + tdSql.checkRows(17) + + tdSql.checkData(0, 2, 1) + tdSql.checkData(1, 2, 2) + tdSql.checkData(2, 2, 3) + tdSql.checkData(3, 2, 4) + tdSql.checkData(4, 2, 5) + tdSql.checkData(5, 2, 6) + tdSql.checkData(6, 2, 7) + tdSql.checkData(7, 2, 8) + tdSql.checkData(8, 2, 9) + tdSql.checkData(9, 2, 10) + tdSql.checkData(10, 2, 11) + tdSql.checkData(11, 2, 12) + tdSql.checkData(12, 2, 13) + tdSql.checkData(13, 2, 14) + tdSql.checkData(14, 2, 15) + tdSql.checkData(15, 2, 16) + tdSql.checkData(16, 2, 17) + + # select interp from supertable partition by + + tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)") + tdSql.checkRows(57) + + tdSql.checkData(0, 2, None) + tdSql.checkData(1, 2, 1) + tdSql.checkData(2, 2, None) + tdSql.checkData(3, 2, 3) + tdSql.checkData(4, 2, None) + tdSql.checkData(5, 2, 5) + tdSql.checkData(6, 2, None) + tdSql.checkData(7, 2, 7) + tdSql.checkData(8, 2, None) + tdSql.checkData(9, 2, 9) + tdSql.checkData(10, 2, None) + tdSql.checkData(11, 2, 11) + tdSql.checkData(12, 2, None) + tdSql.checkData(13, 2, 13) + tdSql.checkData(14, 2, None) + tdSql.checkData(15, 2, 15) + tdSql.checkData(16, 2, None) + tdSql.checkData(17, 2, 17) + tdSql.checkData(18, 2, None) #tdSql.query(f"select interp(c0) from {dbname}.{ctbname1} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(null)") #tdSql.checkRows(13) - tdSql.error(f"select interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:04', '2020-02-02 00:00:16') every(1s) fill(null)") + #tdSql.error(f"select interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:04', '2020-02-02 00:00:16') every(1s) fill(null)") #tdSql.checkRows(13) #tdSql.query(f"select _irowts,interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:04', '2020-02-02 00:00:16') every(1h) fill(prev)") From 5a1681d91d3571837a83a7b1b9f8ef0837eff352 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 5 May 2023 15:24:15 +0800 Subject: [PATCH 11/22] fix data block not properly saved --- source/libs/executor/src/timesliceoperator.c | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 310c3c5674..5110712c80 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -39,7 +39,7 @@ typedef struct STimeSliceOperatorInfo { int64_t prevTs; bool prevTsSet; uint64_t groupId; - SSDataBlock* pCurrentGroupRes; + SSDataBlock* pPrevGroupRes; SSDataBlock* pNextGroupRes; } STimeSliceOperatorInfo; @@ -640,6 +640,11 @@ static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperato } } +static void copyPrevGroupDataBlock(SSDataBlock* pDstBlock, SSDataBlock* pSrcBlock) { + blockDataCleanup(pDstBlock); + assignOneDataBlock(pDstBlock, pSrcBlock); +} + static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) { pSliceInfo->current = pSliceInfo->win.skey; pSliceInfo->prevTsSet = false; @@ -667,7 +672,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pSliceInfo->pNextGroupRes != NULL) { setInputDataBlock(pSup, pSliceInfo->pNextGroupRes, order, MAIN_SCAN, true); doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo); - pSliceInfo->pCurrentGroupRes = pSliceInfo->pNextGroupRes; + copyPrevGroupDataBlock(pSliceInfo->pPrevGroupRes, pSliceInfo->pNextGroupRes); pSliceInfo->pNextGroupRes = NULL; } @@ -701,13 +706,13 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo); - pSliceInfo->pCurrentGroupRes = pBlock; + copyPrevGroupDataBlock(pSliceInfo->pPrevGroupRes, pBlock); } // check if need to interpolate after last datablock // except for fill(next), fill(linear) - genInterpAfterDataBlock(pSliceInfo, pOperator, pSliceInfo->pCurrentGroupRes, - pSliceInfo->pCurrentGroupRes->info.rows - 1); + genInterpAfterDataBlock(pSliceInfo, pOperator, pSliceInfo->pPrevGroupRes, + pSliceInfo->pPrevGroupRes->info.rows - 1); doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); if (pOperator->status == OP_EXEC_DONE) { @@ -771,8 +776,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->prevTsSet = false; pInfo->prevTs = 0; pInfo->groupId = 0; + pInfo->pPrevGroupRes = createDataBlock(); pInfo->pNextGroupRes = NULL; - pInfo->pCurrentGroupRes = NULL; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; @@ -801,6 +806,7 @@ void destroyTimeSliceOperatorInfo(void* param) { STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param; pInfo->pRes = blockDataDestroy(pInfo->pRes); + pInfo->pPrevGroupRes = blockDataDestroy(pInfo->pPrevGroupRes); for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) { SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i); From a3b84b05819363ea26205ae0006dbf24b3e2ea78 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 5 May 2023 15:43:04 +0800 Subject: [PATCH 12/22] add more cases --- tests/system-test/2-query/interp.py | 396 ++++++++++++++++++++++++++-- 1 file changed, 369 insertions(+), 27 deletions(-) diff --git a/tests/system-test/2-query/interp.py b/tests/system-test/2-query/interp.py index 166fd6b686..d066cf87e8 100644 --- a/tests/system-test/2-query/interp.py +++ b/tests/system-test/2-query/interp.py @@ -24,6 +24,7 @@ class TDTestCase: ctbname1 = "ctb1" ctbname2 = "ctb2" ctbname3 = "ctb3" + num_of_ctables = 3 tdSql.prepare() @@ -2401,7 +2402,7 @@ class TDTestCase: - tdLog.printNoPrefix("==========step13:stable cases") + tdLog.printNoPrefix("==========step13:test stable cases") # select interp from supertable tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)") @@ -2527,39 +2528,380 @@ class TDTestCase: tdSql.checkData(15, 2, 16) tdSql.checkData(16, 2, 17) - # select interp from supertable partition by + # select interp from supertable partition by tbname tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)") + + point_idx = {1, 7, 13, 22, 28, 34, 43, 49, 55} + point_dict = {1:1, 7:7, 13:13, 22:3, 28:9, 34:15, 43:5, 49:11, 55:17} + rows_per_partition = 19 + tdSql.checkRows(rows_per_partition * num_of_ctables) + for i in range(num_of_ctables): + for j in range(rows_per_partition): + row = j + i * rows_per_partition + tdSql.checkData(row, 0, f'ctb{i + 1}') + tdSql.checkData(j, 1, f'2020-02-01 00:00:{j}.000') + if row in point_idx: + tdSql.checkData(row, 2, False) + else: + tdSql.checkData(row, 2, True) + + if row in point_idx: + tdSql.checkData(row, 3, point_dict[row]) + else: + tdSql.checkData(row, 3, None) + + tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(value, 0)") + + point_idx = {1, 7, 13, 22, 28, 34, 43, 49, 55} + point_dict = {1:1, 7:7, 13:13, 22:3, 28:9, 34:15, 43:5, 49:11, 55:17} + rows_per_partition = 19 + tdSql.checkRows(rows_per_partition * num_of_ctables) + for i in range(num_of_ctables): + for j in range(rows_per_partition): + row = j + i * rows_per_partition + tdSql.checkData(row, 0, f'ctb{i + 1}') + tdSql.checkData(j, 1, f'2020-02-01 00:00:{j}.000') + if row in point_idx: + tdSql.checkData(row, 2, False) + else: + tdSql.checkData(row, 2, True) + + if row in point_idx: + tdSql.checkData(row, 3, point_dict[row]) + else: + tdSql.checkData(row, 3, 0) + + tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(prev)") + + tdSql.checkRows(48) + for i in range(0, 18): + tdSql.checkData(i, 0, 'ctb1') + + for i in range(18, 34): + tdSql.checkData(i, 0, 'ctb2') + + for i in range(34, 48): + tdSql.checkData(i, 0, 'ctb3') + + tdSql.checkData(0, 1, '2020-02-01 00:00:01.000') + tdSql.checkData(17, 1, '2020-02-01 00:00:18.000') + + tdSql.checkData(18, 1, '2020-02-01 00:00:03.000') + tdSql.checkData(33, 1, '2020-02-01 00:00:18.000') + + tdSql.checkData(34, 1, '2020-02-01 00:00:05.000') + tdSql.checkData(47, 1, '2020-02-01 00:00:18.000') + + for i in range(0, 6): + tdSql.checkData(i, 3, 1) + + for i in range(6, 12): + tdSql.checkData(i, 3, 7) + + for i in range(12, 18): + tdSql.checkData(i, 3, 13) + + for i in range(18, 24): + tdSql.checkData(i, 3, 3) + + for i in range(24, 30): + tdSql.checkData(i, 3, 9) + + for i in range(30, 34): + tdSql.checkData(i, 3, 15) + + for i in range(34, 40): + tdSql.checkData(i, 3, 5) + + for i in range(40, 46): + tdSql.checkData(i, 3, 11) + + for i in range(46, 48): + tdSql.checkData(i, 3, 17) + + tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)") + + tdSql.checkRows(48) + for i in range(0, 14): + tdSql.checkData(i, 0, 'ctb1') + + for i in range(14, 30): + tdSql.checkData(i, 0, 'ctb2') + + for i in range(30, 48): + tdSql.checkData(i, 0, 'ctb3') + + tdSql.checkData(0, 1, '2020-02-01 00:00:00.000') + tdSql.checkData(13, 1, '2020-02-01 00:00:13.000') + + tdSql.checkData(14, 1, '2020-02-01 00:00:00.000') + tdSql.checkData(29, 1, '2020-02-01 00:00:15.000') + + tdSql.checkData(30, 1, '2020-02-01 00:00:00.000') + tdSql.checkData(47, 1, '2020-02-01 00:00:17.000') + + for i in range(0, 2): + tdSql.checkData(i, 3, 1) + + for i in range(2, 8): + tdSql.checkData(i, 3, 7) + + for i in range(8, 14): + tdSql.checkData(i, 3, 13) + + for i in range(14, 18): + tdSql.checkData(i, 3, 3) + + for i in range(18, 24): + tdSql.checkData(i, 3, 9) + + for i in range(24, 30): + tdSql.checkData(i, 3, 15) + + for i in range(30, 36): + tdSql.checkData(i, 3, 5) + + for i in range(36, 42): + tdSql.checkData(i, 3, 11) + + for i in range(42, 48): + tdSql.checkData(i, 3, 17) + + tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") + + tdSql.checkRows(39) + for i in range(0, 13): + tdSql.checkData(i, 0, 'ctb1') + + for i in range(13, 26): + tdSql.checkData(i, 0, 'ctb2') + + for i in range(26, 39): + tdSql.checkData(i, 0, 'ctb3') + + tdSql.checkData(0, 1, '2020-02-01 00:00:01.000') + tdSql.checkData(12, 1, '2020-02-01 00:00:13.000') + + tdSql.checkData(13, 1, '2020-02-01 00:00:03.000') + tdSql.checkData(25, 1, '2020-02-01 00:00:15.000') + + tdSql.checkData(26, 1, '2020-02-01 00:00:05.000') + tdSql.checkData(38, 1, '2020-02-01 00:00:17.000') + + for i in range(0, 13): + tdSql.checkData(i, 3, i + 1) + + for i in range(13, 26): + tdSql.checkData(i, 3, i - 10) + + for i in range(26, 39): + tdSql.checkData(i, 3, i - 21) + + # select interp from supertable partition by column + + tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)") + tdSql.checkRows(171) + + tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(value, 0)") + tdSql.checkRows(171) + + tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(prev)") + tdSql.checkRows(90) + + tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)") + tdSql.checkRows(90) + + tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") + tdSql.checkRows(9) + + # select interp from supertable partition by tag + + tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)") tdSql.checkRows(57) - tdSql.checkData(0, 2, None) - tdSql.checkData(1, 2, 1) - tdSql.checkData(2, 2, None) - tdSql.checkData(3, 2, 3) - tdSql.checkData(4, 2, None) - tdSql.checkData(5, 2, 5) - tdSql.checkData(6, 2, None) - tdSql.checkData(7, 2, 7) - tdSql.checkData(8, 2, None) - tdSql.checkData(9, 2, 9) - tdSql.checkData(10, 2, None) - tdSql.checkData(11, 2, 11) - tdSql.checkData(12, 2, None) - tdSql.checkData(13, 2, 13) - tdSql.checkData(14, 2, None) - tdSql.checkData(15, 2, 15) - tdSql.checkData(16, 2, None) - tdSql.checkData(17, 2, 17) - tdSql.checkData(18, 2, None) + tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(value, 0)") + tdSql.checkRows(57) - #tdSql.query(f"select interp(c0) from {dbname}.{ctbname1} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(null)") - #tdSql.checkRows(13) + tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(prev)") + tdSql.checkRows(48) - #tdSql.error(f"select interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:04', '2020-02-02 00:00:16') every(1s) fill(null)") - #tdSql.checkRows(13) + tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)") + tdSql.checkRows(48) + + tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") + tdSql.checkRows(39) + + # select interp from supertable filter + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where ts between '2020-02-01 00:00:01.000' and '2020-02-01 00:00:13.000' range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") + tdSql.checkRows(13) + + for i in range(13): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where c0 <= 13 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") + tdSql.checkRows(13) + + for i in range(13): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where t1 = 1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") + tdSql.checkRows(13) + + for i in range(13): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where tbname = 'ctb1' range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") + tdSql.checkRows(13) + + for i in range(13): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where ts between '2020-02-01 00:00:01.000' and '2020-02-01 00:00:13.000' partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") + tdSql.checkRows(27) + + for i in range(13): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where c0 <= 13 partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") + tdSql.checkRows(27) + + for i in range(13): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where t1 = 1 partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") + tdSql.checkRows(13) + + for i in range(13): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where tbname = 'ctb1' partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") + tdSql.checkRows(13) + + for i in range(13): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + # select interp from supertable filter limit + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 13") + tdSql.checkRows(13) + + for i in range(13): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 20") + tdSql.checkRows(17) + + for i in range(17): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where ts between '2020-02-01 00:00:01.000' and '2020-02-01 00:00:13.000' range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10") + tdSql.checkRows(10) + + for i in range(10): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where c0 <= 13 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10") + tdSql.checkRows(10) + + for i in range(10): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where t1 = 1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10") + tdSql.checkRows(10) + + for i in range(10): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where tbname = 'ctb1' range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10") + tdSql.checkRows(10) + + for i in range(10): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 13") + tdSql.checkRows(13) + + for i in range(13): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 40") + tdSql.checkRows(39) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where ts between '2020-02-01 00:00:01.000' and '2020-02-01 00:00:13.000' partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10") + tdSql.checkRows(10) + + for i in range(10): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where c0 <= 13 partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10") + tdSql.checkRows(10) + + for i in range(10): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where t1 = 1 partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10") + tdSql.checkRows(10) + + for i in range(10): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where tbname = 'ctb1' partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10") + tdSql.checkRows(10) + + for i in range(10): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 1) + + # select interp from supertable with scalar expression + + tdSql.query(f"select _irowts, _isfilled, interp(1 + 1) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") + tdSql.checkRows(17) + + for i in range(17): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, 2.0) + + tdSql.query(f"select _irowts, _isfilled, interp(c0 + 1) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") + tdSql.checkRows(17) + + for i in range(17): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, i + 2) + + tdSql.query(f"select _irowts, _isfilled, interp(c0 * 2) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") + tdSql.checkRows(17) + + for i in range(17): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, (i + 1) * 2) + + tdSql.query(f"select _irowts, _isfilled, interp(c0 + c1) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") + tdSql.checkRows(17) + + for i in range(17): + tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') + tdSql.checkData(i, 2, (i + 1) * 2) - #tdSql.query(f"select _irowts,interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:04', '2020-02-02 00:00:16') every(1h) fill(prev)") - #tdSql.query(f"select tbname,_irowts,interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:04', '2020-02-02 00:00:16') every(1h) fill(prev)") tdLog.printNoPrefix("======step 14: test interp pseudo columns") tdSql.error(f"select _irowts, c6 from {dbname}.{tbname}") From 0aec541c354155ac30638dc8d0f4c633ab715879 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 5 May 2023 17:34:16 +0800 Subject: [PATCH 13/22] add cn docs --- docs/zh/12-taos-sql/10-function.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/12-taos-sql/10-function.md b/docs/zh/12-taos-sql/10-function.md index 50e82e6b90..458fc9c7a2 100644 --- a/docs/zh/12-taos-sql/10-function.md +++ b/docs/zh/12-taos-sql/10-function.md @@ -888,7 +888,7 @@ INTERP(expr) - INTERP 的输出时间范围根据 RANGE(timestamp1,timestamp2)字段来指定,需满足 timestamp1 <= timestamp2。其中 timestamp1(必选值)为输出时间范围的起始值,即如果 timestamp1 时刻符合插值条件则 timestamp1 为输出的第一条记录,timestamp2(必选值)为输出时间范围的结束值,即输出的最后一条记录的 timestamp 不能大于 timestamp2。 - INTERP 根据 EVERY(time_unit) 字段来确定输出时间范围内的结果条数,即从 timestamp1 开始每隔固定长度的时间(time_unit 值)进行插值,time_unit 可取值时间单位:1a(毫秒),1s(秒),1m(分),1h(小时),1d(天),1w(周)。例如 EVERY(500a) 将对于指定数据每500毫秒间隔进行一次插值. - INTERP 根据 FILL 字段来决定在每个符合输出条件的时刻如何进行插值。关于 FILL 子句如何使用请参考 [FILL 子句](../distinguished/#fill-子句) -- INTERP 只能在一个时间序列内进行插值,因此当作用于超级表时必须跟 partition by tbname 一起使用。 +- INTERP 作用于超级表时, 会将该超级表下的所有子表数据按照主键列排序后进行插值计算,也可以搭配 PARTITION BY tbname 使用,将结果强制规约到单个时间线。 - INTERP 可以与伪列 _irowts 一起使用,返回插值点所对应的时间戳(3.0.2.0版本以后支持)。 - INTERP 可以与伪列 _isfilled 一起使用,显示返回结果是否为原始记录或插值算法产生的数据(3.0.3.0版本以后支持)。 From 77dc97e509254153f8593bc0c0467135f940270e Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 5 May 2023 17:54:16 +0800 Subject: [PATCH 14/22] add en doc --- docs/en/12-taos-sql/10-function.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/12-taos-sql/10-function.md b/docs/en/12-taos-sql/10-function.md index 5c1a833e05..09ffcce33f 100644 --- a/docs/en/12-taos-sql/10-function.md +++ b/docs/en/12-taos-sql/10-function.md @@ -886,7 +886,7 @@ INTERP(expr) - The output time range of `INTERP` is specified by `RANGE(timestamp1,timestamp2)` parameter, with timestamp1 <= timestamp2. timestamp1 is the starting point of the output time range and must be specified. timestamp2 is the ending point of the output time range and must be specified. - The number of rows in the result set of `INTERP` is determined by the parameter `EVERY(time_unit)`. Starting from timestamp1, one interpolation is performed for every time interval specified `time_unit` parameter. The parameter `time_unit` must be an integer, with no quotes, with a time unit of: a(millisecond)), s(second), m(minute), h(hour), d(day), or w(week). For example, `EVERY(500a)` will interpolate every 500 milliseconds. - Interpolation is performed based on `FILL` parameter. For more information about FILL clause, see [FILL Clause](../distinguished/#fill-clause). -- `INTERP` can only be used to interpolate in single timeline. So it must be used with `partition by tbname` when it's used on a STable. +- `INTERP` can be applied to supertable by interpolating primary key sorted data of all its childtables. It can also be used with `partition by tbname` when applied to supertable to generate interpolation on each single timeline. - Pseudocolumn `_irowts` can be used along with `INTERP` to return the timestamps associated with interpolation points(support after version 3.0.2.0). - Pseudocolumn `_isfilled` can be used along with `INTERP` to indicate whether the results are original records or data points generated by interpolation algorithm(support after version 3.0.3.0). From b5026a8775f0131b6a40ba5a0079125f36df347b Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sat, 6 May 2023 10:34:20 +0800 Subject: [PATCH 15/22] add duplicate timestamp case --- tests/system-test/2-query/interp.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/system-test/2-query/interp.py b/tests/system-test/2-query/interp.py index d066cf87e8..87d84133ae 100644 --- a/tests/system-test/2-query/interp.py +++ b/tests/system-test/2-query/interp.py @@ -2902,6 +2902,14 @@ class TDTestCase: tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000') tdSql.checkData(i, 2, (i + 1) * 2) + # check duplicate timestamp + + # add duplicate timestamp for different child tables + tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')") + + tdSql.error(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)") + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)") + tdLog.printNoPrefix("======step 14: test interp pseudo columns") tdSql.error(f"select _irowts, c6 from {dbname}.{tbname}") From 07f714cc8414d2ac4cb3e02d6eb6b11c63da5245 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sat, 6 May 2023 15:06:54 +0800 Subject: [PATCH 16/22] fix doc error --- docs/en/12-taos-sql/06-select.md | 2 +- docs/zh/12-taos-sql/06-select.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/12-taos-sql/06-select.md b/docs/en/12-taos-sql/06-select.md index de7294f7a9..070fd41653 100644 --- a/docs/en/12-taos-sql/06-select.md +++ b/docs/en/12-taos-sql/06-select.md @@ -55,7 +55,7 @@ window_clause: { | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)] interp_clause: - RANGE(ts_val, ts_val), EVERY(every_val), FILL(fill_mod_and_val) + RANGE(ts_val, ts_val) EVERY(every_val) FILL(fill_mod_and_val) partition_by_clause: PARTITION BY expr [, expr] ... diff --git a/docs/zh/12-taos-sql/06-select.md b/docs/zh/12-taos-sql/06-select.md index e4ba5b76e4..870df73471 100644 --- a/docs/zh/12-taos-sql/06-select.md +++ b/docs/zh/12-taos-sql/06-select.md @@ -55,7 +55,7 @@ window_clause: { | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)] interp_clause: - RANGE(ts_val, ts_val), EVERY(every_val), FILL(fill_mod_and_val) + RANGE(ts_val, ts_val) EVERY(every_val) FILL(fill_mod_and_val) partition_by_clause: PARTITION BY expr [, expr] ... From 3b10ac29cebe7f430d42877d0fc6ecfd997a36c7 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 9 May 2023 11:09:29 +0800 Subject: [PATCH 17/22] use functionType instead of functionName --- source/libs/executor/src/timesliceoperator.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 5110712c80..7a4b9014bc 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -194,13 +194,13 @@ static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumn } static bool isInterpFunc(SExprInfo* pExprInfo) { - char *name = pExprInfo->pExpr->_function.functionName; - return (strcasecmp(name, "interp") == 0); + int32_t functionType = pExprInfo->pExpr->_function.functionType; + return (functionType == FUNCTION_TYPE_INTERP); } static bool isGroupKeyFunc(SExprInfo* pExprInfo) { - char *name = pExprInfo->pExpr->_function.functionName; - return (strcasecmp(name, "_group_key") == 0); + int32_t functionType = pExprInfo->pExpr->_function.functionType; + return (functionType == FUNCTION_TYPE_GROUP_KEY); } static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, From 4693c2297037eb3e5ad1b5b2ab80bf007f4969dd Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 9 May 2023 14:06:09 +0800 Subject: [PATCH 18/22] optimize group key storage --- source/libs/executor/src/timesliceoperator.c | 108 +++++++++++++++---- 1 file changed, 85 insertions(+), 23 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 7a4b9014bc..78aabb5c26 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -39,7 +39,7 @@ typedef struct STimeSliceOperatorInfo { int64_t prevTs; bool prevTsSet; uint64_t groupId; - SSDataBlock* pPrevGroupRes; + SGroupKeys* pPrevGroupKey; SSDataBlock* pNextGroupRes; } STimeSliceOperatorInfo; @@ -227,16 +227,26 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp continue; } else if (!isInterpFunc(pExprInfo)) { if (isGroupKeyFunc(pExprInfo)) { - int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; - SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); + if (pSrcBlock != NULL) { + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); - if (colDataIsNull_s(pSrc, index)) { - colDataSetNULL(pDst, pResBlock->info.rows); - continue; + if (colDataIsNull_s(pSrc, index)) { + colDataSetNULL(pDst, pResBlock->info.rows); + continue; + } + + char* v = colDataGetData(pSrc, index); + colDataSetVal(pDst, pResBlock->info.rows, v, false); + } else { + // use stored group key + SGroupKeys* pkey = pSliceInfo->pPrevGroupKey; + if (pkey->isNull == false) { + colDataSetVal(pDst, rows, pkey->pData, false); + } else { + colDataSetNULL(pDst, rows); + } } - - char* v = colDataGetData(pSrc, index); - colDataSetVal(pDst, pResBlock->info.rows, v, false); } continue; } @@ -464,7 +474,31 @@ static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB return TSDB_CODE_SUCCESS; } -static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { +static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExprSup) { + if (pInfo->pPrevGroupKey != NULL) { + return TSDB_CODE_SUCCESS; + } + + pInfo->pPrevGroupKey = taosMemoryCalloc(1, sizeof(SGroupKeys)); + if (pInfo->pPrevGroupKey == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) { + SExprInfo* pExprInfo = &pExprSup->pExprInfo[i]; + + if (isGroupKeyFunc(pExprInfo)) { + pInfo->pPrevGroupKey->bytes = pExprInfo->base.resSchema.bytes; + pInfo->pPrevGroupKey->type = pExprInfo->base.resSchema.type; + pInfo->pPrevGroupKey->isNull = false; + pInfo->pPrevGroupKey->pData = taosMemoryCalloc(1, pInfo->pPrevGroupKey->bytes); + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock, SExprSupp* pExprSup) { int32_t code; code = initPrevRowsKeeper(pInfo, pBlock); if (code != TSDB_CODE_SUCCESS) { @@ -481,6 +515,12 @@ static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock return TSDB_CODE_FAILED; } + code = initGroupKeyKeeper(pInfo, pExprSup); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_FAILED; + } + + return TSDB_CODE_SUCCESS; } @@ -627,22 +667,42 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS } } -static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, SSDataBlock* pSrcBlock, - int32_t index) { +static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, int32_t index) { SSDataBlock* pResBlock = pSliceInfo->pRes; SInterval* pInterval = &pSliceInfo->interval; while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && pSliceInfo->fillType != TSDB_FILL_LINEAR) { - genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pSrcBlock, index, false); + genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false); pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); } } -static void copyPrevGroupDataBlock(SSDataBlock* pDstBlock, SSDataBlock* pSrcBlock) { - blockDataCleanup(pDstBlock); - assignOneDataBlock(pDstBlock, pSrcBlock); +static void copyPrevGroupKey(SExprSupp* pExprSup, SGroupKeys* pGroupKey, SSDataBlock* pSrcBlock) { + for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { + SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; + + if (isGroupKeyFunc(pExprInfo)) { + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); + + if (colDataIsNull_s(pSrc, 0)) { + pGroupKey->isNull = true; + break; + } + + char* v = colDataGetData(pSrc, 0); + if (IS_VAR_DATA_TYPE(pGroupKey->type)) { + memcpy(pGroupKey->pData, v, varDataTLen(v)); + } else { + memcpy(pGroupKey->pData, v, pGroupKey->bytes); + } + + pGroupKey->isNull = false; + break; + } + } } static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) { @@ -672,7 +732,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pSliceInfo->pNextGroupRes != NULL) { setInputDataBlock(pSup, pSliceInfo->pNextGroupRes, order, MAIN_SCAN, true); doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo); - copyPrevGroupDataBlock(pSliceInfo->pPrevGroupRes, pSliceInfo->pNextGroupRes); + copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pSliceInfo->pNextGroupRes); pSliceInfo->pNextGroupRes = NULL; } @@ -698,7 +758,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); } - int32_t code = initKeeperInfo(pSliceInfo, pBlock); + int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -706,13 +766,12 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo); - copyPrevGroupDataBlock(pSliceInfo->pPrevGroupRes, pBlock); + copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock); } // check if need to interpolate after last datablock // except for fill(next), fill(linear) - genInterpAfterDataBlock(pSliceInfo, pOperator, pSliceInfo->pPrevGroupRes, - pSliceInfo->pPrevGroupRes->info.rows - 1); + genInterpAfterDataBlock(pSliceInfo, pOperator, 0); doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); if (pOperator->status == OP_EXEC_DONE) { @@ -776,7 +835,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->prevTsSet = false; pInfo->prevTs = 0; pInfo->groupId = 0; - pInfo->pPrevGroupRes = createDataBlock(); + pInfo->pPrevGroupKey = NULL; pInfo->pNextGroupRes = NULL; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { @@ -806,7 +865,6 @@ void destroyTimeSliceOperatorInfo(void* param) { STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param; pInfo->pRes = blockDataDestroy(pInfo->pRes); - pInfo->pPrevGroupRes = blockDataDestroy(pInfo->pPrevGroupRes); for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) { SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i); @@ -826,6 +884,10 @@ void destroyTimeSliceOperatorInfo(void* param) { taosMemoryFree(pKey->end.val); } taosArrayDestroy(pInfo->pLinearInfo); + + taosMemoryFree(pInfo->pPrevGroupKey->pData); + taosMemoryFree(pInfo->pPrevGroupKey); + cleanupExprSupp(&pInfo->scalarSup); for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) { From e1140d4c1566973cbb5b32d28c0a2226b9de0fff Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 9 May 2023 15:05:41 +0800 Subject: [PATCH 19/22] optimize duplicate timestamp check --- source/libs/executor/src/timesliceoperator.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 78aabb5c26..f54f54be37 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -176,6 +176,10 @@ static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumn int64_t currentTs = *(int64_t*)colDataGetData(pTsCol, curIndex); + if (currentTs > pSliceInfo->win.ekey) { + return false; + } + if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevTs)) { return true; } @@ -183,7 +187,7 @@ static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumn pSliceInfo->prevTsSet = true; pSliceInfo->prevTs = currentTs; - if (curIndex < rows - 1) { + if (currentTs == pSliceInfo->win.ekey && curIndex < rows - 1) { int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1); if (currentTs == nextTs) { return true; From 6afc40110d90ca95e8506abad5fe81c15fcb8ec3 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 9 May 2023 15:06:06 +0800 Subject: [PATCH 20/22] add dup ts check case --- tests/system-test/2-query/interp.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/system-test/2-query/interp.py b/tests/system-test/2-query/interp.py index 87d84133ae..cdbfa0de84 100644 --- a/tests/system-test/2-query/interp.py +++ b/tests/system-test/2-query/interp.py @@ -2907,6 +2907,8 @@ class TDTestCase: # add duplicate timestamp for different child tables tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')") + tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:14') every(1s) fill(null)") + tdSql.error(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:15') every(1s) fill(null)") tdSql.error(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)") tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)") From 523f08fe4b4973804388936689ebf5d9503a7128 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 10 May 2023 11:18:42 +0800 Subject: [PATCH 21/22] return result after finish processing of each group --- source/libs/executor/src/timesliceoperator.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index f54f54be37..15a9ad9594 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -784,6 +784,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // restore initial value for next group resetTimesliceInfo(pSliceInfo); + break; } // restore the value From 121564de2fe2079f181f1720396461d434863b5c Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 11 May 2023 10:04:25 +0800 Subject: [PATCH 22/22] add threshold for group result --- source/libs/executor/src/timesliceoperator.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 15a9ad9594..b7f7a44080 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -784,7 +784,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // restore initial value for next group resetTimesliceInfo(pSliceInfo); - break; + if (pResBlock->info.rows >= 4096) { + break; + } } // restore the value