From 2f5ed16cb801fee91709c203656c6e42bcdafb7b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 24 Oct 2022 13:26:54 +0800 Subject: [PATCH] fix(query): remove redundant ts cols in cache scan. --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 11 +++- source/libs/executor/inc/executorimpl.h | 3 +- source/libs/executor/src/cachescanoperator.c | 61 ++++++++++++++----- source/libs/executor/src/executorimpl.c | 11 ++-- source/libs/executor/src/groupoperator.c | 6 +- source/libs/executor/src/joinoperator.c | 2 +- source/libs/executor/src/projectoperator.c | 4 +- source/libs/executor/src/scanoperator.c | 19 +++--- source/libs/executor/src/sortoperator.c | 6 +- source/libs/executor/src/tfill.c | 2 +- source/libs/executor/src/timewindowoperator.c | 25 ++++---- 11 files changed, 89 insertions(+), 61 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 83dcbc60c7..905150cab0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -37,6 +37,8 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea int32_t numOfRows = pBlock->info.rows; if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { + bool allNullRow = true; + for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]); @@ -46,12 +48,15 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea p->ts = pColVal->ts; p->bytes = TSDB_KEYSIZE; *(int64_t*)p->buf = pColVal->ts; + allNullRow = false; } else { int32_t slotId = slotIds[i]; SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); p->ts = pColVal->ts; p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); + allNullRow = p->isNull & allNullRow; + if (!p->isNull) { if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { varDataSetLen(p->buf, pColVal->colVal.value.nData); @@ -69,6 +74,8 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE); colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false); } + + pBlock->info.rows += allNullRow? 0:1; } else { ASSERT(HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)); @@ -96,9 +103,9 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea } } } - } - pBlock->info.rows += 1; + pBlock->info.rows += 1; + } } int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0237f64ecb..0aab75c655 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -889,8 +889,7 @@ typedef struct SJoinOperatorInfo { void doDestroyExchangeOperatorInfo(void* param); SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn, - __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode, - __optr_decode_fn_t decode, __optr_explain_fn_t explain); + __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_explain_fn_t explain); int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 8c76a3f69b..00a338cc41 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -27,7 +27,8 @@ static SSDataBlock* doScanCache(SOperatorInfo* pOperator); static void destroyLastrowScanOperator(void* param); -static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); +static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); +static SArray* removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SArray* pColMatchInfo); SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { @@ -40,12 +41,15 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe } pInfo->readHandle = *readHandle; - pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc); + + SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc; + pInfo->pRes = createResDataBlock(pDescNode); int32_t numOfCols = 0; - pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pScanNode->scan.node.pOutputDataBlockDesc, - &numOfCols, COL_MATCH_FROM_COL_ID); - code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds); + SArray* pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); + pInfo->pColMatchInfo = removeRedundantTsCol(pScanNode, pColMatchInfo); + + code = extractCacheScanSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -72,11 +76,9 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe } if (pScanNode->scan.pScanPseudoCols != NULL) { - SExprSupp* pPseudoExpr = &pInfo->pseudoExprSup; - - pPseudoExpr->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &pPseudoExpr->numOfExprs); - pPseudoExpr->pCtx = - createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset); + SExprSupp* p = &pInfo->pseudoExprSup; + p->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->numOfExprs); + p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset); } pOperator->name = "LastrowScanOperator"; @@ -88,7 +90,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL); + createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, NULL, destroyLastrowScanOperator, NULL); pOperator->cost.openCost = 0; return pOperator; @@ -130,7 +132,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { // check for tag values int32_t resultRows = pInfo->pBufferredRes->info.rows; - ASSERT(resultRows == taosArrayGetSize(pInfo->pUidList)); + + // the results may be null, if last values are all null + ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList)); pInfo->indexOfBufferedRes = 0; } @@ -240,7 +244,7 @@ void destroyLastrowScanOperator(void* param) { taosMemoryFreeClear(param); } -int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds) { +int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds) { size_t numOfCols = taosArrayGetSize(pColMatchInfo); *pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t)); @@ -248,16 +252,18 @@ int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInf return TSDB_CODE_OUT_OF_MEMORY; } + SSchemaWrapper* pWrapper = pTaskInfo->schemaInfo.sw; + for (int32_t i = 0; i < numOfCols; ++i) { SColMatchInfo* pColMatch = taosArrayGet(pColMatchInfo, i); - for (int32_t j = 0; j < pTaskInfo->schemaInfo.sw->nCols; ++j) { - if (pColMatch->colId == pTaskInfo->schemaInfo.sw->pSchema[j].colId && + for (int32_t j = 0; j < pWrapper->nCols; ++j) { + if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { (*pSlotIds)[pColMatch->targetSlotId] = -1; break; } - if (pColMatch->colId == pTaskInfo->schemaInfo.sw->pSchema[j].colId) { + if (pColMatch->colId == pWrapper->pSchema[j].colId) { (*pSlotIds)[pColMatch->targetSlotId] = j; break; } @@ -265,4 +271,27 @@ int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInf } return TSDB_CODE_SUCCESS; +} + +SArray* removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SArray* pColMatchInfo) { + if (!pScanNode->ignoreNull) { // retrieve cached last value + return pColMatchInfo; + } + + SArray* pMatchInfo = taosArrayInit(taosArrayGetSize(pColMatchInfo), sizeof(SColMatchInfo)); + + for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) { + SColMatchInfo* pColInfo = taosArrayGet(pColMatchInfo, i); + + int32_t slotId = pColInfo->targetSlotId; + SNodeList* pList = pScanNode->scan.node.pOutputDataBlockDesc->pSlots; + + SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId); + if (pDesc->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) { + taosArrayPush(pMatchInfo, pColInfo); + } + } + + taosArrayDestroy(pColMatchInfo); + return pMatchInfo; } \ No newline at end of file diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e55404935c..a1f7b700a4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -110,16 +110,13 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) { } SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn, - __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode, - __optr_decode_fn_t decode, __optr_explain_fn_t explain) { + __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_explain_fn_t explain) { SOperatorFpSet fpSet = { ._openFn = openFn, .getNextFn = nextFn, .getStreamResFn = streamFn, .cleanupFn = cleanup, .closeFn = closeFn, - .encodeResultRow = encode, - .decodeResultRow = decode, .getExplainFn = explain, }; @@ -2305,7 +2302,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, - destroyExchangeOperatorInfo, NULL, NULL, NULL); + destroyExchangeOperatorInfo, NULL); return pOperator; _error: @@ -3081,7 +3078,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo, - aggEncodeResultRow, aggDecodeResultRow, NULL); + NULL); if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = downstream->info; @@ -3306,7 +3303,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroyFillOperatorInfo, NULL, NULL, NULL); + createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroyFillOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); return pOperator; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 60794fc22a..644f02f60d 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -443,7 +443,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, - destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); + destroyGroupOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -820,7 +820,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo, - NULL, NULL, NULL); + NULL); code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -1095,7 +1095,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, NULL, - destroyStreamPartitionOperatorInfo, NULL, NULL, NULL); + destroyStreamPartitionOperatorInfo, NULL); initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index eab0307e01..8dac44684f 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -116,7 +116,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t } pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL); + createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL); code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 56fd6a99ef..b3ea7a5573 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -105,7 +105,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, - destroyProjectOperatorInfo, NULL, NULL, NULL); + destroyProjectOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -405,7 +405,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL, - destroyIndefinitOperatorInfo, NULL, NULL, NULL); + destroyIndefinitOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4e1b07e662..5659b0354b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -781,7 +781,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, - NULL, NULL, getTableScannerExecInfo); + getTableScannerExecInfo); // for non-blocking operator, the open cost is always 0 pOperator->cost.openCost = 0; @@ -809,7 +809,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL); return pOperator; } @@ -930,7 +930,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, - destroyBlockDistScanOperatorInfo, NULL, NULL, NULL); + destroyBlockDistScanOperatorInfo, NULL); return pOperator; _error: @@ -2115,7 +2115,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL); return pOperator; _end: @@ -2295,7 +2295,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL, NULL, NULL); + createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL); return pOperator; @@ -3201,7 +3201,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL); + createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL); return pOperator; @@ -3338,7 +3338,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL); + createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL); return pOperator; @@ -3878,9 +3878,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pOperator->pTaskInfo = pTaskInfo; initResultSizeInfo(&pOperator->resultInfo, 1024); - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, NULL, - NULL, getTableMergeScanExplainExecInfo); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, + destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo); pOperator->cost.openCost = 0; return pOperator; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index c00b5c4802..39987d4501 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -64,7 +64,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* // there are headers, so pageSize = rowSize + header pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer - pOperator->fpSet = createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, + pOperator->fpSet = createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, getExplainExecInfo); int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -515,7 +515,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo, - NULL, NULL, getGroupSortExplainExecInfo); + getGroupSortExplainExecInfo); int32_t code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -752,7 +752,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, NULL, - destroyMultiwayMergeOperatorInfo, NULL, NULL, getMultiwayMergeExplainExecInfo); + destroyMultiwayMergeOperatorInfo, getMultiwayMergeExplainExecInfo); code = appendDownstream(pOperator, downStreams, numStreams); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index d1d22dc3e5..99267e55e6 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -1694,7 +1694,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, NULL, destroyStreamFillOperatorInfo, - NULL, NULL, NULL); + NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6dda6488a0..8e5f03c4ac 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1831,8 +1831,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL, - destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); + pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL, destroyIntervalOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -2638,8 +2637,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyTimeSliceOperatorInfo, NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyTimeSliceOperatorInfo, NULL); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); @@ -2710,7 +2708,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, NULL, - destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); + destroyStateWindowOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -2784,7 +2782,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, - destroySWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); + destroySWindowOperatorInfo, NULL); pOperator->pTaskInfo = pTaskInfo; code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -3465,8 +3463,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->info = pInfo; pOperator->fpSet = - createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, - aggEncodeResultRow, aggDecodeResultRow, NULL); + createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, NULL); if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); } @@ -4261,7 +4258,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo, - aggEncodeResultRow, aggDecodeResultRow, NULL); + NULL); if (downstream) { initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, pInfo->primaryTsIndex); @@ -4408,7 +4405,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream pOperator->name = "StreamSessionSemiAggOperator"; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL, - destroyStreamSessionAggOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); + destroyStreamSessionAggOperatorInfo, NULL); } pInfo->pGroupIdTbNameMap = @@ -4777,7 +4774,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pOperator->pTaskInfo = pTaskInfo; pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL, - destroyStreamStateOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); + destroyStreamStateOperatorInfo, NULL); initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, pInfo->primaryTsIndex); code = appendDownstream(pOperator, &downstream, 1); @@ -5054,7 +5051,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->info = miaInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL, - destroyMAIOperatorInfo, NULL, NULL, NULL); + destroyMAIOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -5366,7 +5363,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge pOperator->info = pMergeIntervalInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL, - destroyMergeIntervalOperatorInfo, NULL, NULL, NULL); + destroyMergeIntervalOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -5599,7 +5596,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, - aggEncodeResultRow, aggDecodeResultRow, NULL); + NULL); initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); code = appendDownstream(pOperator, &downstream, 1);