From 2055879debe9513c779047d2669c0d555e1bf55b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 18 Nov 2022 09:47:26 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/executor/inc/executorimpl.h | 33 +- source/libs/executor/src/executor.c | 22 +- source/libs/executor/src/executorimpl.c | 16 +- source/libs/executor/src/scanoperator.c | 329 ++++++------------ source/libs/executor/src/timewindowoperator.c | 4 +- 5 files changed, 143 insertions(+), 261 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0e2635459d..03dbca8668 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -315,26 +315,31 @@ typedef struct STableMetaCacheInfo { uint64_t cacheHit; } STableMetaCacheInfo; -typedef struct STableScanInfo { +typedef struct STableScanBase { STsdbReader* dataReader; - SReadHandle readHandle; - SLimitInfo limitInfo; SFileBlockLoadRecorder readRecorder; + SQueryTableDataCond cond; + SAggOptrPushDownInfo pdInfo; + SColMatchInfo matchInfo; + SReadHandle readHandle; + SExprSupp pseudoSup; + STableMetaCacheInfo metaCache; + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan +} STableScanBase; + +typedef struct STableScanInfo { + STableScanBase base; + SLimitInfo limitInfo; SScanInfo scanInfo; int32_t scanTimes; SSDataBlock* pResBlock; - SColMatchInfo matchInfo; - SExprSupp pseudoSup; - SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; SSampleExecInfo sample; // sample execution info int32_t currentGroupId; int32_t currentTable; int8_t scanMode; - SAggOptrPushDownInfo pdInfo; int8_t assignBlockUid; - STableMetaCacheInfo metaCache; } STableScanInfo; typedef struct STableMergeScanInfo { @@ -344,8 +349,7 @@ typedef struct STableMergeScanInfo { bool hasGroupId; uint64_t groupId; SArray* queryConds; // array of queryTableDataCond - STsdbReader* pReader; - SReadHandle readHandle; + STableScanBase base; int32_t bufPageSize; uint32_t sortBufSize; // max buffer size for in-memory sort SArray* pSortInfo; @@ -354,20 +358,11 @@ typedef struct STableMergeScanInfo { int64_t startTs; // sort start time SArray* sortSourceParams; SLimitInfo limitInfo; - SFileBlockLoadRecorder readRecorder; int64_t numOfRows; SScanInfo scanInfo; int32_t scanTimes; - SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context - SResultRowInfo* pResultRowInfo; - int32_t* rowEntryInfoOffset; - SExprInfo* pExpr; SSDataBlock* pResBlock; - SColMatchInfo matchInfo; int32_t numOfOutput; - SExprSupp pseudoSup; - SQueryTableDataCond cond; - int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; // if the upstream is an interval operator, the interval info is also kept here to get the time diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index e561b6e124..7dad9245d5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1026,8 +1026,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT SStreamScanInfo* pInfo = pOperator->info; if (pOffset->type == TMQ_OFFSET__LOG) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - tsdbReaderClose(pTSInfo->dataReader); - pTSInfo->dataReader = NULL; + tsdbReaderClose(pTSInfo->base.dataReader); + pTSInfo->base.dataReader = NULL; #if 0 if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) && pInfo->tqReader->pWalReader->curVersion != pOffset->version) { @@ -1079,23 +1079,23 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // TODO after dropping table, table may not found ASSERT(found); - if (pTableScanInfo->dataReader == NULL) { + if (pTableScanInfo->base.dataReader == NULL) { STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); int32_t num = tableListGetSize(pTaskInfo->pTableInfoList); - if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num, - &pTableScanInfo->dataReader, NULL) < 0 || - pTableScanInfo->dataReader == NULL) { + if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num, + &pTableScanInfo->base.dataReader, NULL) < 0 || + pTableScanInfo->base.dataReader == NULL) { ASSERT(0); } } STableKeyInfo tki = {.uid = uid}; - tsdbSetTableList(pTableScanInfo->dataReader, &tki, 1); - int64_t oldSkey = pTableScanInfo->cond.twindows.skey; - pTableScanInfo->cond.twindows.skey = ts + 1; - tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); - pTableScanInfo->cond.twindows.skey = oldSkey; + tsdbSetTableList(pTableScanInfo->base.dataReader, &tki, 1); + int64_t oldSkey = pTableScanInfo->base.cond.twindows.skey; + pTableScanInfo->base.cond.twindows.skey = ts + 1; + tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); + pTableScanInfo->base.cond.twindows.skey = oldSkey; pTableScanInfo->scanTimes = 0; qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index cf890ac3d7..c36175cd6a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1725,13 +1725,13 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan return TSDB_CODE_SUCCESS; } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = pOperator->info; - *order = pTableScanInfo->cond.order; - *scanFlag = pTableScanInfo->scanFlag; + *order = pTableScanInfo->base.cond.order; + *scanFlag = pTableScanInfo->base.scanFlag; return TSDB_CODE_SUCCESS; } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) { STableMergeScanInfo* pTableScanInfo = pOperator->info; - *order = pTableScanInfo->cond.order; - *scanFlag = pTableScanInfo->scanFlag; + *order = pTableScanInfo->base.cond.order; + *scanFlag = pTableScanInfo->base.scanFlag; return TSDB_CODE_SUCCESS; } else { if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { @@ -2378,8 +2378,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = downstream->info; - pTableScanInfo->pdInfo.pExprSup = &pOperator->exprSupp; - pTableScanInfo->pdInfo.pAggSup = &pInfo->aggSup; + pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp; + pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup; } code = appendDownstream(pOperator, &downstream, 1); @@ -2744,7 +2744,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } STableScanInfo* pScanInfo = pOperator->info; - pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; + pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder; } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; @@ -2769,7 +2769,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } STableScanInfo* pScanInfo = pOperator->info; - pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; + pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder; } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode, pTaskInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2a3bfdd948..ee1114f330 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -233,25 +233,25 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro STableScanInfo* pTableScanInfo = pOperator->info; - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, + SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable, buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); if (p1 == NULL) { return NULL; } - *pPage = getBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, p1->pageId); + *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId); return (SResultRow*)((char*)(*pPage) + p1->offset); } static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) { STableScanInfo* pTableScanInfo = pOperator->info; - if (pTableScanInfo->pdInfo.pExprSup == NULL) { + if (pTableScanInfo->base.pdInfo.pExprSup == NULL) { return TSDB_CODE_SUCCESS; } - SExprSupp* pSup1 = pTableScanInfo->pdInfo.pExprSup; + SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup; SFilePage* pPage = NULL; SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->groupId, &pPage); @@ -264,7 +264,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* for (int32_t i = 0; i < pSup1->numOfExprs; ++i) { int32_t functionId = pSup1->pCtx[i].functionId; - SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->pdInfo.pExprSup->rowEntryInfoOffset); + SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset); int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window); if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) { @@ -274,7 +274,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* } // release buffer pages - releaseBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, pPage); + releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage); if (notLoadBlock) { *status = FUNC_DATA_REQUIRED_NOT_LOAD; @@ -293,7 +293,7 @@ static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsA return keep; } -static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { +static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { bool allColumnsHaveAgg = true; SColumnDataAgg** pColAgg = NULL; @@ -330,7 +330,7 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, return true; } -static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, +static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, int32_t rows) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) { SExprSupp* pSup = &pTableScanInfo->pseudoSup; @@ -374,7 +374,11 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo } } -static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, + + + + +static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableScanInfo* pInfo = pOperator->info; @@ -492,7 +496,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca return TSDB_CODE_SUCCESS; } -static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) { +static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) { SET_REVERSE_SCAN_FLAG(pTableScanInfo); switchCtxOrder(pCtx, numOfOutput); @@ -697,7 +701,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { int64_t st = taosGetTimestampUs(); - while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { + while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) { if (isTaskKilled(pTaskInfo)) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -712,7 +716,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { SDataBlockInfo* pBInfo = &pBlock->info; int32_t rows = 0; - tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &rows, &pBInfo->uid, &pBInfo->window); + tsdbRetrieveDataBlockInfo(pTableScanInfo->base.dataReader, &rows, &pBInfo->uid, &pBInfo->window); blockDataEnsureCapacity(pBlock, rows); // todo remove it latter pBInfo->rows = rows; @@ -721,7 +725,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); uint32_t status = 0; - int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); + int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pOperator->pTaskInfo->env, code); @@ -732,10 +736,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { continue; } - pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; - pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows; + pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime; + pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime; // todo refactor /*pTableScanInfo->lastStatus.uid = pBlock->info.uid;*/ @@ -755,7 +759,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; // The read handle is not initialized yet, since no qualified tables exists - if (pTableScanInfo->dataReader == NULL || pOperator->status == OP_EXEC_DONE) { + if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -774,15 +778,15 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo)); // do prepare for the next round table scan operation - tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); + tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); } } int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc; if (pTableScanInfo->scanTimes < total) { - if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) { - prepareForDescendingScan(pTableScanInfo, pOperator->exprSupp.pCtx, 0); - tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); + if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) { + prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0); + tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo)); } @@ -799,7 +803,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { pTableScanInfo->scanFlag = REPEAT_SCAN; qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo)); - tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); + tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); } } } @@ -828,11 +832,11 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable); - tsdbSetTableList(pInfo->dataReader, pTableInfo, 1); + tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1); qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables, pInfo->currentTable, pTaskInfo->id.str); - tsdbReaderReset(pInfo->dataReader, &pInfo->cond); + tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; } } else { // scan table group by group sequentially @@ -845,10 +849,10 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { int32_t num = 0; STableKeyInfo* pList = NULL; tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); - ASSERT(pInfo->dataReader == NULL); + ASSERT(pInfo->base.dataReader == NULL); - int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, pList, num, - (STsdbReader**)&pInfo->dataReader, GET_TASKID(pTaskInfo)); + int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, + (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -874,8 +878,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { STableKeyInfo* pList = NULL; tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); - tsdbSetTableList(pInfo->dataReader, pList, num); - tsdbReaderReset(pInfo->dataReader, &pInfo->cond); + tsdbSetTableList(pInfo->base.dataReader, pList, num); + tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; result = doGroupedTableScan(pOperator); @@ -891,7 +895,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder)); STableScanInfo* pTableScanInfo = pOptr->info; - *pRecorder = pTableScanInfo->readRecorder; + *pRecorder = pTableScanInfo->base.readRecorder; *pOptrExplain = pRecorder; *len = sizeof(SFileBlockLoadRecorder); return 0; @@ -900,17 +904,17 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr static void destroyTableScanOperatorInfo(void* param) { STableScanInfo* pTableScanInfo = (STableScanInfo*)param; blockDataDestroy(pTableScanInfo->pResBlock); - cleanupQueryTableDataCond(&pTableScanInfo->cond); + cleanupQueryTableDataCond(&pTableScanInfo->base.cond); - tsdbReaderClose(pTableScanInfo->dataReader); - pTableScanInfo->dataReader = NULL; + tsdbReaderClose(pTableScanInfo->base.dataReader); + pTableScanInfo->base.dataReader = NULL; - if (pTableScanInfo->matchInfo.pList != NULL) { - taosArrayDestroy(pTableScanInfo->matchInfo.pList); + if (pTableScanInfo->base.matchInfo.pList != NULL) { + taosArrayDestroy(pTableScanInfo->base.matchInfo.pList); } - taosLRUCacheCleanup(pTableScanInfo->metaCache.pTableMetaEntryCache); - cleanupExprSupp(&pTableScanInfo->pseudoSup); + taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache); + cleanupExprSupp(&pTableScanInfo->base.pseudoSup); taosMemoryFreeClear(param); } @@ -927,26 +931,26 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, int32_t numOfCols = 0; int32_t code = - extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->limitInfo); - code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); + code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { goto _error; } if (pScanNode->pScanPseudoCols != NULL) { - SExprSupp* pSup = &pInfo->pseudoSup; + SExprSupp* pSup = &pInfo->base.pseudoSup; pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs); pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset); } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; - pInfo->pdInfo.interval = extractIntervalInfo(pTableScanNode); - pInfo->readHandle = *readHandle; + pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode); + pInfo->base.readHandle = *readHandle; pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); @@ -969,13 +973,13 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; - pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); - if (pInfo->metaCache.pTableMetaEntryCache == NULL) { + pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); + if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) { code = terrno; goto _error; } - taosLRUCacheSetStrictCapacity(pInfo->metaCache.pTableMetaEntryCache, false); + taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo, getTableScannerExecInfo); @@ -997,7 +1001,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - pInfo->dataReader = pReadHandle; + pInfo->base.dataReader = pReadHandle; // pInfo->prevGroupId = -1; setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED, @@ -1204,11 +1208,11 @@ static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t grou } void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) { - pTableScanInfo->cond.twindows = *pWin; + pTableScanInfo->base.cond.twindows = *pWin; pTableScanInfo->scanTimes = 0; pTableScanInfo->currentGroupId = -1; - tsdbReaderClose(pTableScanInfo->dataReader); - pTableScanInfo->dataReader = NULL; + tsdbReaderClose(pTableScanInfo->base.dataReader); + pTableScanInfo->base.dataReader = NULL; } static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs, @@ -1216,7 +1220,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0}; STableScanInfo* pTableScanInfo = pTableScanOp->info; - SQueryTableDataCond cond = pTableScanInfo->cond; + SQueryTableDataCond cond = pTableScanInfo->base.cond; cond.startVersion = -1; cond.endVersion = maxVersion; @@ -1228,7 +1232,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU blockDataCleanup(pBlock); STsdbReader* pReader = NULL; - int32_t code = tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &cond, &tblInfo, 1, (STsdbReader**)&pReader, + int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, (STsdbReader**)&pReader, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { terrno = code; @@ -1247,8 +1251,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU blockDataEnsureCapacity(pBlock, rows); pBlock->info.rows = rows; - relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, rows); + relocateColumnData(pBlock, pTableScanInfo->base.matchInfo.pList, pCols, true); + doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, rows); pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBInfo->uid); } @@ -1375,8 +1379,8 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 *pRowIndex = 0; pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - tsdbReaderClose(pTableScanInfo->dataReader); - pTableScanInfo->dataReader = NULL; + tsdbReaderClose(pTableScanInfo->base.dataReader); + pTableScanInfo->base.dataReader = NULL; return NULL; } @@ -1806,8 +1810,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } else { if (!pTaskInfo->streamInfo.returned) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - tsdbReaderClose(pTSInfo->dataReader); - pTSInfo->dataReader = NULL; + tsdbReaderClose(pTSInfo->base.dataReader); + pTSInfo->base.dataReader = NULL; tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer); qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1); if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) { @@ -1973,22 +1977,22 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 || pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond)); + memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond)); if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) { - pTSInfo->cond.startVersion = 0; - pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1; - qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->cond.startVersion, - pTSInfo->cond.endVersion); + pTSInfo->base.cond.startVersion = 0; + pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1; + qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion, + pTSInfo->base.cond.endVersion); } else { - pTSInfo->cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1; - pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2; - qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->cond.startVersion, - pTSInfo->cond.endVersion); + pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1; + pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2; + qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion, + pTSInfo->base.cond.endVersion); } /*resetTableScanInfo(pTSInfo, pWin);*/ - tsdbReaderClose(pTSInfo->dataReader); - pTSInfo->dataReader = NULL; + tsdbReaderClose(pTSInfo->base.dataReader); + pTSInfo->base.dataReader = NULL; pTSInfo->scanTimes = 0; pTSInfo->currentGroupId = -1; @@ -2009,11 +2013,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - tsdbReaderClose(pTSInfo->dataReader); - pTSInfo->dataReader = NULL; + tsdbReaderClose(pTSInfo->base.dataReader); + pTSInfo->base.dataReader = NULL; - pTSInfo->cond.startVersion = -1; - pTSInfo->cond.endVersion = -1; + pTSInfo->base.cond.startVersion = -1; + pTSInfo->base.cond.endVersion = -1; return NULL; } @@ -2118,8 +2122,8 @@ FETCH_NEXT_BLOCK: SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader); - updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId, version); + uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader); + updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); // printDataBlock(pSDB, "stream scan update"); @@ -2513,7 +2517,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info; if (pHandle->version > 0) { - pTSInfo->cond.endVersion = pHandle->version; + pTSInfo->base.cond.endVersion = pHandle->version; } STableKeyInfo* pList = NULL; @@ -2522,8 +2526,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (pHandle->initTableReader) { pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; - pTSInfo->dataReader = NULL; - code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL); + pTSInfo->base.dataReader = NULL; + code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, &pTSInfo->base.dataReader, NULL); if (code != 0) { terrno = code; destroyTableScanOperatorInfo(pTableScanOp); @@ -2559,7 +2563,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys goto _error; } taosArrayDestroy(tableIdList); - memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->cond, sizeof(SQueryTableDataCond)); + memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond)); } else { taosArrayDestroy(pColIds); } @@ -4374,123 +4378,6 @@ _error: return NULL; } -// todo refactor -static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, - SSDataBlock* pBlock, uint32_t* status) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableMergeScanInfo* pInfo = pOperator->info; - - SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; - - pCost->totalBlocks += 1; - pCost->totalRows += pBlock->info.rows; - - *status = pInfo->dataBlockLoadFlag; - if (pOperator->exprSupp.pFilterInfo != NULL || - overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) { - (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; - } - - SDataBlockInfo* pBlockInfo = &pBlock->info; - taosMemoryFreeClear(pBlock->pBlockAgg); - - if (*status == FUNC_DATA_REQUIRED_FILTEROUT) { - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), - pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - pCost->filterOutBlocks += 1; - return TSDB_CODE_SUCCESS; - } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { - qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), - pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - pCost->skipBlocks += 1; - - // clear all data in pBlock that are set when handing the previous block - for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { - SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i); - pcol->pData = NULL; - } - - return TSDB_CODE_SUCCESS; - } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) { - pCost->loadBlockStatis += 1; - - bool allColumnsHaveAgg = true; - SColumnDataAgg** pColAgg = NULL; - - if (allColumnsHaveAgg == true) { - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - - // todo create this buffer during creating operator - if (pBlock->pBlockAgg == NULL) { - pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES); - } - - for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i); - if (!pColMatchInfo->needOutput) { - continue; - } - pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i]; - } - - return TSDB_CODE_SUCCESS; - } else { // failed to load the block sma data, data block statistics does not exist, load data block instead - *status = FUNC_DATA_REQUIRED_DATA_LOAD; - } - } - - ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); - - // todo filter data block according to the block sma data firstly -#if 0 - if (!doFilterByBlockSMA(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { - pCost->filterOutBlocks += 1; - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, - pBlockInfo->window.ekey, pBlockInfo->rows); - (*status) = FUNC_DATA_REQUIRED_FILTEROUT; - return TSDB_CODE_SUCCESS; - } -#endif - - pCost->totalCheckedRows += pBlock->info.rows; - pCost->loadBlocks += 1; - - STsdbReader* reader = pTableScanInfo->pReader; - SArray* pCols = tsdbRetrieveDataBlock(reader, NULL); - if (pCols == NULL) { - return terrno; - } - - relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); - - // currently only the tbname pseudo column - SExprSupp* pSup = &pTableScanInfo->pseudoSup; - - int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, - pBlock->info.rows, GET_TASKID(pTaskInfo), NULL); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } - - if (pOperator->exprSupp.pFilterInfo != NULL) { - int64_t st = taosGetTimestampMs(); - doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo); - - double el = (taosGetTimestampUs() - st) / 1000.0; - pTableScanInfo->readRecorder.filterTime += el; - - if (pBlock->info.rows == 0) { - pCost->filterOutBlocks += 1; - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms", - GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el); - } else { - qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el); - } - } - - return TSDB_CODE_SUCCESS; -} - static SSDataBlock* getTableDataBlockImpl(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; @@ -4506,14 +4393,14 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { int64_t st = taosGetTimestampUs(); void* p = tableListGetInfo(pInfo->tableListInfo, readIdx + pInfo->tableStartIndex); - SReadHandle* pHandle = &pInfo->readHandle; + SReadHandle* pHandle = &pInfo->base.readHandle; - int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo)); + int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->base.dataReader, GET_TASKID(pTaskInfo)); if (code != 0) { T_LONG_JMP(pTaskInfo->env, code); } - STsdbReader* reader = pInfo->pReader; + STsdbReader* reader = pInfo->base.dataReader; while (tsdbNextDataBlock(reader)) { if (isTaskKilled(pTaskInfo)) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); @@ -4539,7 +4426,8 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { } uint32_t status = 0; - code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); + loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); +// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -4552,15 +4440,15 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); pOperator->resultInfo.totalRows += pBlock->info.rows; - pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - tsdbReaderClose(pInfo->pReader); - pInfo->pReader = NULL; + tsdbReaderClose(pInfo->base.dataReader); + pInfo->base.dataReader = NULL; return pBlock; } - tsdbReaderClose(pInfo->pReader); - pInfo->pReader = NULL; + tsdbReaderClose(pInfo->base.dataReader); + pInfo->base.dataReader = NULL; return NULL; } @@ -4612,7 +4500,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableEndIdx = pInfo->tableEndIndex; - pInfo->pReader = NULL; + pInfo->base.dataReader = NULL; // todo the total available buffer should be determined by total capacity of buffer of this task. // the additional one is reserved for merge result @@ -4635,7 +4523,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { taosArrayPush(pInfo->sortSourceParams, ¶m); SQueryTableDataCond cond; - dumpSQueryTableCond(&pInfo->cond, &cond); + dumpSQueryTableCond(&pInfo->base.cond, &cond); taosArrayPush(pInfo->queryConds, &cond); } @@ -4763,7 +4651,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { void destroyTableMergeScanOperatorInfo(void* param) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; - cleanupQueryTableDataCond(&pTableScanInfo->cond); + cleanupQueryTableDataCond(&pTableScanInfo->base.cond); int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds); @@ -4774,8 +4662,8 @@ void destroyTableMergeScanOperatorInfo(void* param) { taosArrayDestroy(pTableScanInfo->sortSourceParams); - tsdbReaderClose(pTableScanInfo->pReader); - pTableScanInfo->pReader = NULL; + tsdbReaderClose(pTableScanInfo->base.dataReader); + pTableScanInfo->base.dataReader = NULL; for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) { SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i); @@ -4783,17 +4671,16 @@ void destroyTableMergeScanOperatorInfo(void* param) { } taosArrayDestroy(pTableScanInfo->queryConds); - if (pTableScanInfo->matchInfo.pList != NULL) { - taosArrayDestroy(pTableScanInfo->matchInfo.pList); + if (pTableScanInfo->base.matchInfo.pList != NULL) { + taosArrayDestroy(pTableScanInfo->base.matchInfo.pList); } pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); taosArrayDestroy(pTableScanInfo->pSortInfo); - cleanupExprSupp(&pTableScanInfo->pseudoSup); + cleanupExprSupp(&pTableScanInfo->base.pseudoSup); - taosMemoryFreeClear(pTableScanInfo->rowEntryInfoOffset); taosMemoryFreeClear(param); } @@ -4802,7 +4689,7 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla // TODO: merge these two info into one struct STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo)); STableMergeScanInfo* pInfo = pOptr->info; - execInfo->blockRecorder = pInfo->readRecorder; + execInfo->blockRecorder = pInfo->base.readRecorder; execInfo->sortExecInfo = pInfo->sortExecInfo; *pOptrExplain = execInfo; @@ -4823,26 +4710,26 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN int32_t numOfCols = 0; int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, - &pInfo->matchInfo); + &pInfo->base.matchInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } - code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); + code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pInfo->matchInfo.pList); + taosArrayDestroy(pInfo->base.matchInfo.pList); goto _error; } if (pTableScanNode->scan.pScanPseudoCols != NULL) { - SExprSupp* pSup = &pInfo->pseudoSup; + SExprSupp* pSup = &pInfo->base.pseudoSup; pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs); pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset); } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; - pInfo->readHandle = *readHandle; + pInfo->base.readHandle = *readHandle; pInfo->interval = extractIntervalInfo(pTableScanNode); pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); @@ -4854,7 +4741,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN } pInfo->tableListInfo = pTableListInfo; - pInfo->scanFlag = MAIN_SCAN; + pInfo->base.scanFlag = MAIN_SCAN; initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->pResBlock = createResDataBlock(pDescNode); @@ -4862,7 +4749,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam)); - pInfo->pSortInfo = generateSortByTsInfo(pInfo->matchInfo.pList, pInfo->cond.order); + pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fe29b8eda9..bf1d146b13 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2549,8 +2549,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->current = pInfo->win.skey; STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; - pScanInfo->cond.twindows = pInfo->win; - pScanInfo->cond.type = TIMEWINDOW_RANGE_EXTERNAL; + pScanInfo->base.cond.twindows = pInfo->win; + pScanInfo->base.cond.type = TIMEWINDOW_RANGE_EXTERNAL; setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo);