From 7522c2edf31b0d87601a40f6a00478921bacbdbc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 21 Jul 2022 20:44:36 +0800 Subject: [PATCH 1/2] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 20 ++++++++++++------- source/libs/executor/src/scanoperator.c | 8 ++------ source/libs/function/src/builtins.c | 2 +- .../script/tsim/sma/rsmaCreateInsertQuery.sim | 1 + 6 files changed, 19 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index bad1037123..ccca13e55c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -788,11 +788,11 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo); } colIndex += 1; + ASSERT(rowIndex == remain); } else { // the specified column does not exist in file block, fill with null data colDataAppendNNULL(pColData, 0, remain); } - ASSERT(rowIndex == remain); i += 1; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b36a5ebdd1..21068c68a4 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -869,7 +869,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, - SExecTaskInfo* pTaskInfo); + STimeWindowAggSupp* pTwAggSup, SExecTaskInfo* pTaskInfo); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c42d477b33..ee71c38ee4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3414,6 +3414,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo); if (pResBlock->info.rows > pResultInfo->threshold || pResBlock->info.rows > 0) { + pResBlock->info.groupId = pInfo->curGroupId; return pResBlock; } @@ -3456,17 +3457,20 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { // 1. The result in current group not reach the threshold of output result, continue // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) { + pResBlock->info.groupId = pInfo->curGroupId; return pResBlock; } doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo); if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) { + pResBlock->info.groupId = pInfo->curGroupId; return pResBlock; } } else if (pInfo->existNewGroupBlock) { // try next group assert(pBlock != NULL); doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo); if (pResBlock->info.rows > pResultInfo->threshold) { + pResBlock->info.groupId = pInfo->curGroupId; return pResBlock; } } else { @@ -3486,23 +3490,19 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { SSDataBlock* fillResult = NULL; while (true) { fillResult = doFillImpl(pOperator); - if (fillResult != NULL) { - doFilter(pInfo->pCondition, fillResult); - } - if (fillResult == NULL) { doSetOperatorCompleted(pOperator); break; } + doFilter(pInfo->pCondition, fillResult); if (fillResult->info.rows > 0) { break; } } if (fillResult != NULL) { - size_t rows = fillResult->info.rows; - pOperator->resultInfo.totalRows += rows; + pOperator->resultInfo.totalRows += fillResult->info.rows; } return fillResult; @@ -4444,6 +4444,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; + STimeWindowAggSupp aggSup = (STimeWindowAggSupp){ + .waterMark = pTableScanNode->watermark, + .calTrigger = pTableScanNode->triggerType, + .maxTs = INT64_MIN, + }; + if (pHandle->vnode) { int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); @@ -4454,7 +4460,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); - SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo); + SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, &aggSup, pTaskInfo); return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 574aa648e5..ab62905c3f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1525,7 +1525,7 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { } SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, - SExecTaskInfo* pTaskInfo) { + STimeWindowAggSupp* pTwSup, SExecTaskInfo* pTaskInfo) { SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -1539,11 +1539,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pTagCond = pTagCond; - pInfo->twAggSup = (STimeWindowAggSupp){ - .waterMark = pTableScanNode->watermark, - .calTrigger = pTableScanNode->triggerType, - .maxTs = INT64_MIN, - }; + pInfo->twAggSup = *pTwSup; int32_t numOfCols = 0; pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index ec8e6b038e..1d95d58a57 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2088,7 +2088,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateApercentileMerge, .getEnvFunc = getApercentileFuncEnv, - .initFunc = functionSetup, + .initFunc = apercentileFunctionSetup, .processFunc = apercentileFunctionMerge, .finalizeFunc = apercentileFinalize, .invertFunc = NULL, diff --git a/tests/script/tsim/sma/rsmaCreateInsertQuery.sim b/tests/script/tsim/sma/rsmaCreateInsertQuery.sim index fb3503c841..bde56cb862 100644 --- a/tests/script/tsim/sma/rsmaCreateInsertQuery.sim +++ b/tests/script/tsim/sma/rsmaCreateInsertQuery.sim @@ -43,6 +43,7 @@ endi if $data01 != 1 then if $data01 != 10 then + print =============> $data01 print retention level 2 file result $data01 != 1 or 10 return -1 endi From 72bbc27e8c0ec6660a3b88aabc8ee51e25d4a84a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 22 Jul 2022 10:50:54 +0800 Subject: [PATCH 2/2] fix(query): remove the invalid update ops. --- source/common/src/tdatablock.c | 5 +++++ source/libs/executor/src/executorimpl.c | 8 ++------ source/libs/function/src/builtinsimpl.c | 2 ++ 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 1792a18c07..f7b0da0014 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1107,6 +1107,11 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF void blockDataCleanup(SSDataBlock* pDataBlock) { pDataBlock->info.rows = 0; + pDataBlock->info.groupId = 0; + + pDataBlock->info.window.ekey = 0; + pDataBlock->info.window.skey = 0; + size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ee71c38ee4..a930a7bb46 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1336,12 +1336,10 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { - if (pFilterNode == NULL) { - return; - } - if (pBlock->info.rows == 0) { + if (pFilterNode == NULL || pBlock->info.rows == 0) { return; } + SFilterInfo* filter = NULL; // todo move to the initialization function @@ -1358,8 +1356,6 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { filterFreeInfo(filter); extractQualifiedTupleByFilterResult(pBlock, rowRes, keep); - blockDataUpdateTsWindow(pBlock, 0); - taosMemoryFree(rowRes); } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 176da0bb48..05f84df7f8 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2592,6 +2592,7 @@ int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo); if (pInfo->algo == APERCT_ALGO_TDIGEST) { + buildTDigestInfo(pInfo); if (pInfo->pTDigest->size > 0) { pInfo->result = tdigestQuantile(pInfo->pTDigest, pInfo->percent / 100); } else { // no need to free @@ -2599,6 +2600,7 @@ int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return TSDB_CODE_SUCCESS; } } else { + buildHistogramInfo(pInfo); if (pInfo->pHisto->numOfElems > 0) { qDebug("get the final res:%d, elements:%"PRId64", entry:%d", pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries);