diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d083cd4d21..a6bc4dfe97 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -163,7 +163,7 @@ typedef struct { SArray* pStopInfo; } STaskStopInfo; -typedef struct SExecTaskInfo { +struct SExecTaskInfo { STaskIdInfo id; uint32_t status; STimeWindow window; @@ -182,7 +182,7 @@ typedef struct SExecTaskInfo { struct SOperatorInfo* pRoot; SLocalFetch localFetch; STaskStopInfo stopInfo; -} SExecTaskInfo; +}; enum { OP_NOT_OPENED = 0x0, @@ -315,37 +315,39 @@ typedef struct STableMetaCacheInfo { uint64_t cacheHit; } STableMetaCacheInfo; -typedef struct STableScanInfo { +typedef struct STableScanBase { STsdbReader* dataReader; - SReadHandle readHandle; - SLimitInfo limitInfo; SFileBlockLoadRecorder readRecorder; + SQueryTableDataCond cond; + SAggOptrPushDownInfo pdInfo; + SColMatchInfo matchInfo; + SReadHandle readHandle; + SExprSupp pseudoSup; + STableMetaCacheInfo metaCache; + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t dataBlockLoadFlag; + SLimitInfo limitInfo; +} STableScanBase; + +typedef struct STableScanInfo { + STableScanBase base; SScanInfo scanInfo; int32_t scanTimes; SSDataBlock* pResBlock; - SColMatchInfo matchInfo; - SExprSupp pseudoSup; - SQueryTableDataCond cond; - int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan - int32_t dataBlockLoadFlag; SSampleExecInfo sample; // sample execution info int32_t currentGroupId; int32_t currentTable; int8_t scanMode; - SAggOptrPushDownInfo pdInfo; int8_t assignBlockUid; - STableMetaCacheInfo metaCache; } STableScanInfo; typedef struct STableMergeScanInfo { - STableListInfo* tableListInfo; int32_t tableStartIndex; int32_t tableEndIndex; bool hasGroupId; uint64_t groupId; SArray* queryConds; // array of queryTableDataCond - STsdbReader* pReader; - SReadHandle readHandle; + STableScanBase base; int32_t bufPageSize; uint32_t sortBufSize; // max buffer size for in-memory sort SArray* pSortInfo; @@ -354,27 +356,12 @@ typedef struct STableMergeScanInfo { int64_t startTs; // sort start time SArray* sortSourceParams; SLimitInfo limitInfo; - SFileBlockLoadRecorder readRecorder; int64_t numOfRows; SScanInfo scanInfo; int32_t scanTimes; - SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context - SResultRowInfo* pResultRowInfo; - int32_t* rowEntryInfoOffset; - SExprInfo* pExpr; SSDataBlock* pResBlock; - SColMatchInfo matchInfo; - int32_t numOfOutput; - SExprSupp pseudoSup; - SQueryTableDataCond cond; - int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan - int32_t dataBlockLoadFlag; - - // if the upstream is an interval operator, the interval info is also kept here to get the time - // window to check if current data block needs to be loaded. - SInterval interval; - SSampleExecInfo sample; // sample execution info - SSortExecInfo sortExecInfo; + SSampleExecInfo sample; // sample execution info + SSortExecInfo sortExecInfo; } STableMergeScanInfo; typedef struct STagScanInfo { @@ -387,17 +374,17 @@ typedef struct STagScanInfo { } STagScanInfo; typedef struct SLastrowScanInfo { - SSDataBlock* pRes; - SReadHandle readHandle; - void* pLastrowReader; - SColMatchInfo matchInfo; - int32_t* pSlotIds; - SExprSupp pseudoExprSup; - int32_t retrieveType; - int32_t currentGroupIndex; - SSDataBlock* pBufferredRes; - SArray* pUidList; - int32_t indexOfBufferedRes; + SSDataBlock* pRes; + SReadHandle readHandle; + void* pLastrowReader; + SColMatchInfo matchInfo; + int32_t* pSlotIds; + SExprSupp pseudoExprSup; + int32_t retrieveType; + int32_t currentGroupIndex; + SSDataBlock* pBufferredRes; + SArray* pUidList; + int32_t indexOfBufferedRes; } SLastrowScanInfo; typedef enum EStreamScanMode { @@ -414,13 +401,6 @@ enum { PROJECT_RETRIEVE_DONE = 0x2, }; -typedef struct SCatchSupporter { - SHashObj* pWindowHashTable; // quick locate the window object for each window - SDiskbasedBuf* pDataBuf; // buffer based on blocked-wised disk file - int32_t keySize; - int64_t* pKeyBuf; -} SCatchSupporter; - typedef struct SStreamAggSupporter { int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row SSDataBlock* pScanBlock; @@ -1042,8 +1022,8 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo, - SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, + SExecTaskInfo* pTaskInfo); void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index e561b6e124..7dad9245d5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1026,8 +1026,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT SStreamScanInfo* pInfo = pOperator->info; if (pOffset->type == TMQ_OFFSET__LOG) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - tsdbReaderClose(pTSInfo->dataReader); - pTSInfo->dataReader = NULL; + tsdbReaderClose(pTSInfo->base.dataReader); + pTSInfo->base.dataReader = NULL; #if 0 if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) && pInfo->tqReader->pWalReader->curVersion != pOffset->version) { @@ -1079,23 +1079,23 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // TODO after dropping table, table may not found ASSERT(found); - if (pTableScanInfo->dataReader == NULL) { + if (pTableScanInfo->base.dataReader == NULL) { STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); int32_t num = tableListGetSize(pTaskInfo->pTableInfoList); - if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num, - &pTableScanInfo->dataReader, NULL) < 0 || - pTableScanInfo->dataReader == NULL) { + if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num, + &pTableScanInfo->base.dataReader, NULL) < 0 || + pTableScanInfo->base.dataReader == NULL) { ASSERT(0); } } STableKeyInfo tki = {.uid = uid}; - tsdbSetTableList(pTableScanInfo->dataReader, &tki, 1); - int64_t oldSkey = pTableScanInfo->cond.twindows.skey; - pTableScanInfo->cond.twindows.skey = ts + 1; - tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); - pTableScanInfo->cond.twindows.skey = oldSkey; + tsdbSetTableList(pTableScanInfo->base.dataReader, &tki, 1); + int64_t oldSkey = pTableScanInfo->base.cond.twindows.skey; + pTableScanInfo->base.cond.twindows.skey = ts + 1; + tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); + pTableScanInfo->base.cond.twindows.skey = oldSkey; pTableScanInfo->scanTimes = 0; qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6ee0f11e48..0dd5765aa4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1000,12 +1000,6 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc return TSDB_CODE_SUCCESS; } -static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) { - if (pTableQueryInfo == NULL) { - return; - } -} - void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { if (status == TASK_NOT_COMPLETED) { pTaskInfo->status = status; @@ -1652,55 +1646,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey); -static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) { - size_t size = taosArrayGetSize(groupInfo); - if (size == 0) { - return true; - } - - for (int32_t i = 0; i < size; ++i) { - int32_t* index = taosArrayGet(groupInfo, i); - - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index); - bool isNull = colDataIsNull(pColInfo, rowIndex, pBlock->info.rows, NULL); - - if ((isNull && buf[i] != NULL) || (!isNull && buf[i] == NULL)) { - return false; - } - - char* pCell = colDataGetData(pColInfo, rowIndex); - if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { - if (varDataLen(pCell) != varDataLen(buf[i])) { - return false; - } else { - if (memcmp(varDataVal(pCell), varDataVal(buf[i]), varDataLen(pCell)) != 0) { - return false; - } - } - } else { - if (memcmp(pCell, buf[i], pColInfo->info.bytes) != 0) { - return false; - } - } - } - - return 0; -} - -static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) { - int32_t size = (int32_t)taosArrayGetSize(pColumnList); - - for (int32_t i = 0; i < size; ++i) { - int32_t* index = taosArrayGet(pColumnList, i); - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index); - - char* data = colDataGetData(pColInfo, rowIndex); - memcpy(rowColData[i], data, colDataGetLength(pColInfo, rowIndex)); - } - - return true; -} - int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) { // todo add more information about exchange operation int32_t type = pOperator->operatorType; @@ -1712,13 +1657,13 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan return TSDB_CODE_SUCCESS; } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = pOperator->info; - *order = pTableScanInfo->cond.order; - *scanFlag = pTableScanInfo->scanFlag; + *order = pTableScanInfo->base.cond.order; + *scanFlag = pTableScanInfo->base.scanFlag; return TSDB_CODE_SUCCESS; } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) { STableMergeScanInfo* pTableScanInfo = pOperator->info; - *order = pTableScanInfo->cond.order; - *scanFlag = pTableScanInfo->scanFlag; + *order = pTableScanInfo->base.cond.order; + *scanFlag = pTableScanInfo->base.scanFlag; return TSDB_CODE_SUCCESS; } else { if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { @@ -2365,8 +2310,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = downstream->info; - pTableScanInfo->pdInfo.pExprSup = &pOperator->exprSupp; - pTableScanInfo->pdInfo.pAggSup = &pInfo->aggSup; + pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp; + pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup; } code = appendDownstream(pOperator, &downstream, 1); @@ -2731,7 +2676,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } STableScanInfo* pScanInfo = pOperator->info; - pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; + pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder; } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; @@ -2749,14 +2694,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo); + pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); if (NULL == pOperator) { pTaskInfo->code = terrno; return NULL; } STableScanInfo* pScanInfo = pOperator->info; - pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; + pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder; } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode, pTaskInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7ef5585bdf..916b6df969 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -233,25 +233,25 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro STableScanInfo* pTableScanInfo = pOperator->info; - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, + SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable, buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); if (p1 == NULL) { return NULL; } - *pPage = getBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, p1->pageId); + *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId); return (SResultRow*)((char*)(*pPage) + p1->offset); } static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) { STableScanInfo* pTableScanInfo = pOperator->info; - if (pTableScanInfo->pdInfo.pExprSup == NULL) { + if (pTableScanInfo->base.pdInfo.pExprSup == NULL) { return TSDB_CODE_SUCCESS; } - SExprSupp* pSup1 = pTableScanInfo->pdInfo.pExprSup; + SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup; SFilePage* pPage = NULL; SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->groupId, &pPage); @@ -264,7 +264,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* for (int32_t i = 0; i < pSup1->numOfExprs; ++i) { int32_t functionId = pSup1->pCtx[i].functionId; - SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->pdInfo.pExprSup->rowEntryInfoOffset); + SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset); int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window); if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) { @@ -274,7 +274,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* } // release buffer pages - releaseBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, pPage); + releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage); if (notLoadBlock) { *status = FUNC_DATA_REQUIRED_NOT_LOAD; @@ -293,7 +293,7 @@ static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsA return keep; } -static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { +static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { bool allColumnsHaveAgg = true; SColumnDataAgg** pColAgg = NULL; @@ -330,7 +330,7 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, return true; } -static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, +static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, int32_t rows) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) { SExprSupp* pSup = &pTableScanInfo->pseudoSup; @@ -374,19 +374,16 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo } } -static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, +static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableScanInfo* pInfo = pOperator->info; - SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; pCost->totalBlocks += 1; pCost->totalRows += pBlock->info.rows; bool loadSMA = false; - - *status = pInfo->dataBlockLoadFlag; + *status = pTableScanInfo->dataBlockLoadFlag; if (pOperator->exprSupp.pFilterInfo != NULL || overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) { (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; @@ -485,14 +482,14 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca } } - applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo, pOperator); + applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator); pCost->totalRows += pBlock->info.rows; - pInfo->limitInfo.numOfOutputRows = pCost->totalRows; + pTableScanInfo->limitInfo.numOfOutputRows = pCost->totalRows; return TSDB_CODE_SUCCESS; } -static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) { +static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) { SET_REVERSE_SCAN_FLAG(pTableScanInfo); switchCtxOrder(pCtx, numOfOutput); @@ -700,7 +697,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { int64_t st = taosGetTimestampUs(); - while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { + while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) { if (isTaskKilled(pTaskInfo)) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -715,7 +712,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { SDataBlockInfo* pBInfo = &pBlock->info; int32_t rows = 0; - tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &rows, &pBInfo->uid, &pBInfo->window); + tsdbRetrieveDataBlockInfo(pTableScanInfo->base.dataReader, &rows, &pBInfo->uid, &pBInfo->window); blockDataEnsureCapacity(pBlock, rows); // todo remove it latter pBInfo->rows = rows; @@ -724,7 +721,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); uint32_t status = 0; - int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); + int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pOperator->pTaskInfo->env, code); @@ -735,10 +732,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { continue; } - pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; - pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows; + pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime; + pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime; // todo refactor /*pTableScanInfo->lastStatus.uid = pBlock->info.uid;*/ @@ -758,7 +755,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; // The read handle is not initialized yet, since no qualified tables exists - if (pTableScanInfo->dataReader == NULL || pOperator->status == OP_EXEC_DONE) { + if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -773,19 +770,19 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); - pTableScanInfo->scanFlag = REPEAT_SCAN; + pTableScanInfo->base.scanFlag = REPEAT_SCAN; qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo)); // do prepare for the next round table scan operation - tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); + tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); } } int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc; if (pTableScanInfo->scanTimes < total) { - if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) { - prepareForDescendingScan(pTableScanInfo, pOperator->exprSupp.pCtx, 0); - tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); + if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) { + prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0); + tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo)); } @@ -799,10 +796,10 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { if (pTableScanInfo->scanTimes < total) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); - pTableScanInfo->scanFlag = REPEAT_SCAN; + pTableScanInfo->base.scanFlag = REPEAT_SCAN; qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo)); - tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); + tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); } } } @@ -831,11 +828,11 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable); - tsdbSetTableList(pInfo->dataReader, pTableInfo, 1); + tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1); qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables, pInfo->currentTable, pTaskInfo->id.str); - tsdbReaderReset(pInfo->dataReader, &pInfo->cond); + tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; } } else { // scan table group by group sequentially @@ -848,10 +845,10 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { int32_t num = 0; STableKeyInfo* pList = NULL; tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); - ASSERT(pInfo->dataReader == NULL); + ASSERT(pInfo->base.dataReader == NULL); - int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, pList, num, - (STsdbReader**)&pInfo->dataReader, GET_TASKID(pTaskInfo)); + int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, + (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -870,15 +867,15 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // reset value for the next group data output pOperator->status = OP_OPENED; - pInfo->limitInfo.numOfOutputRows = 0; - pInfo->limitInfo.remainOffset = pInfo->limitInfo.limit.offset; + pInfo->base.limitInfo.numOfOutputRows = 0; + pInfo->base.limitInfo.remainOffset = pInfo->base.limitInfo.limit.offset; int32_t num = 0; STableKeyInfo* pList = NULL; tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); - tsdbSetTableList(pInfo->dataReader, pList, num); - tsdbReaderReset(pInfo->dataReader, &pInfo->cond); + tsdbSetTableList(pInfo->base.dataReader, pList, num); + tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; result = doGroupedTableScan(pOperator); @@ -894,7 +891,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder)); STableScanInfo* pTableScanInfo = pOptr->info; - *pRecorder = pTableScanInfo->readRecorder; + *pRecorder = pTableScanInfo->base.readRecorder; *pOptrExplain = pRecorder; *len = sizeof(SFileBlockLoadRecorder); return 0; @@ -903,17 +900,17 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr static void destroyTableScanOperatorInfo(void* param) { STableScanInfo* pTableScanInfo = (STableScanInfo*)param; blockDataDestroy(pTableScanInfo->pResBlock); - cleanupQueryTableDataCond(&pTableScanInfo->cond); + cleanupQueryTableDataCond(&pTableScanInfo->base.cond); - tsdbReaderClose(pTableScanInfo->dataReader); - pTableScanInfo->dataReader = NULL; + tsdbReaderClose(pTableScanInfo->base.dataReader); + pTableScanInfo->base.dataReader = NULL; - if (pTableScanInfo->matchInfo.pList != NULL) { - taosArrayDestroy(pTableScanInfo->matchInfo.pList); + if (pTableScanInfo->base.matchInfo.pList != NULL) { + taosArrayDestroy(pTableScanInfo->base.matchInfo.pList); } - taosLRUCacheCleanup(pTableScanInfo->metaCache.pTableMetaEntryCache); - cleanupExprSupp(&pTableScanInfo->pseudoSup); + taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache); + cleanupExprSupp(&pTableScanInfo->base.pseudoSup); taosMemoryFreeClear(param); } @@ -930,30 +927,32 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, int32_t numOfCols = 0; int32_t code = - extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } - initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->limitInfo); - code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); + initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo); + code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { goto _error; } if (pScanNode->pScanPseudoCols != NULL) { - SExprSupp* pSup = &pInfo->pseudoSup; + SExprSupp* pSup = &pInfo->base.pseudoSup; pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs); pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset); } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; - pInfo->pdInfo.interval = extractIntervalInfo(pTableScanNode); - pInfo->readHandle = *readHandle; + + pInfo->base.scanFlag = MAIN_SCAN; + pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode); + pInfo->base.readHandle = *readHandle; pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); - pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; + pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired; initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pResBlock = createResDataBlock(pDescNode); @@ -964,7 +963,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, goto _error; } - pInfo->scanFlag = MAIN_SCAN; pInfo->currentGroupId = -1; pInfo->assignBlockUid = pTableScanNode->assignBlockUid; @@ -972,13 +970,13 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; - pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); - if (pInfo->metaCache.pTableMetaEntryCache == NULL) { + pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); + if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) { code = terrno; goto _error; } - taosLRUCacheSetStrictCapacity(pInfo->metaCache.pTableMetaEntryCache, false); + taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo, getTableScannerExecInfo); @@ -1000,7 +998,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - pInfo->dataReader = pReadHandle; + pInfo->base.dataReader = pReadHandle; // pInfo->prevGroupId = -1; setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED, @@ -1207,11 +1205,11 @@ static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t grou } void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) { - pTableScanInfo->cond.twindows = *pWin; + pTableScanInfo->base.cond.twindows = *pWin; pTableScanInfo->scanTimes = 0; pTableScanInfo->currentGroupId = -1; - tsdbReaderClose(pTableScanInfo->dataReader); - pTableScanInfo->dataReader = NULL; + tsdbReaderClose(pTableScanInfo->base.dataReader); + pTableScanInfo->base.dataReader = NULL; } static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs, @@ -1219,7 +1217,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0}; STableScanInfo* pTableScanInfo = pTableScanOp->info; - SQueryTableDataCond cond = pTableScanInfo->cond; + SQueryTableDataCond cond = pTableScanInfo->base.cond; cond.startVersion = -1; cond.endVersion = maxVersion; @@ -1231,7 +1229,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU blockDataCleanup(pBlock); STsdbReader* pReader = NULL; - int32_t code = tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &cond, &tblInfo, 1, (STsdbReader**)&pReader, + int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, (STsdbReader**)&pReader, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { terrno = code; @@ -1250,8 +1248,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU blockDataEnsureCapacity(pBlock, rows); pBlock->info.rows = rows; - relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, rows); + relocateColumnData(pBlock, pTableScanInfo->base.matchInfo.pList, pCols, true); + doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, rows); pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBInfo->uid); } @@ -1378,8 +1376,8 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 *pRowIndex = 0; pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - tsdbReaderClose(pTableScanInfo->dataReader); - pTableScanInfo->dataReader = NULL; + tsdbReaderClose(pTableScanInfo->base.dataReader); + pTableScanInfo->base.dataReader = NULL; return NULL; } @@ -1813,8 +1811,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } else { if (!pTaskInfo->streamInfo.returned) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - tsdbReaderClose(pTSInfo->dataReader); - pTSInfo->dataReader = NULL; + tsdbReaderClose(pTSInfo->base.dataReader); + pTSInfo->base.dataReader = NULL; tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer); qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1); if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) { @@ -1980,22 +1978,22 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 || pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond)); + memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond)); if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) { - pTSInfo->cond.startVersion = 0; - pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1; - qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->cond.startVersion, - pTSInfo->cond.endVersion); + pTSInfo->base.cond.startVersion = 0; + pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1; + qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion, + pTSInfo->base.cond.endVersion); } else { - pTSInfo->cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1; - pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2; - qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->cond.startVersion, - pTSInfo->cond.endVersion); + pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1; + pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2; + qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion, + pTSInfo->base.cond.endVersion); } /*resetTableScanInfo(pTSInfo, pWin);*/ - tsdbReaderClose(pTSInfo->dataReader); - pTSInfo->dataReader = NULL; + tsdbReaderClose(pTSInfo->base.dataReader); + pTSInfo->base.dataReader = NULL; pTSInfo->scanTimes = 0; pTSInfo->currentGroupId = -1; @@ -2016,11 +2014,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - tsdbReaderClose(pTSInfo->dataReader); - pTSInfo->dataReader = NULL; + tsdbReaderClose(pTSInfo->base.dataReader); + pTSInfo->base.dataReader = NULL; - pTSInfo->cond.startVersion = -1; - pTSInfo->cond.endVersion = -1; + pTSInfo->base.cond.startVersion = -1; + pTSInfo->base.cond.endVersion = -1; return NULL; } @@ -2128,8 +2126,8 @@ FETCH_NEXT_BLOCK: SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader); - updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId, version); + uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader); + updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); // printDataBlock(pSDB, "stream scan update"); @@ -2520,7 +2518,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info; if (pHandle->version > 0) { - pTSInfo->cond.endVersion = pHandle->version; + pTSInfo->base.cond.endVersion = pHandle->version; } STableKeyInfo* pList = NULL; @@ -2529,8 +2527,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (pHandle->initTableReader) { pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; - pTSInfo->dataReader = NULL; - code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL); + pTSInfo->base.dataReader = NULL; + code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, &pTSInfo->base.dataReader, NULL); if (code != 0) { terrno = code; destroyTableScanOperatorInfo(pTableScanOp); @@ -2566,7 +2564,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys goto _error; } taosArrayDestroy(tableIdList); - memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->cond, sizeof(SQueryTableDataCond)); + memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond)); } else { taosArrayDestroy(pColIds); } @@ -4381,123 +4379,6 @@ _error: return NULL; } -// todo refactor -static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, - SSDataBlock* pBlock, uint32_t* status) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableMergeScanInfo* pInfo = pOperator->info; - - SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; - - pCost->totalBlocks += 1; - pCost->totalRows += pBlock->info.rows; - - *status = pInfo->dataBlockLoadFlag; - if (pOperator->exprSupp.pFilterInfo != NULL || - overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) { - (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; - } - - SDataBlockInfo* pBlockInfo = &pBlock->info; - taosMemoryFreeClear(pBlock->pBlockAgg); - - if (*status == FUNC_DATA_REQUIRED_FILTEROUT) { - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), - pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - pCost->filterOutBlocks += 1; - return TSDB_CODE_SUCCESS; - } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { - qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), - pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - pCost->skipBlocks += 1; - - // clear all data in pBlock that are set when handing the previous block - for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { - SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i); - pcol->pData = NULL; - } - - return TSDB_CODE_SUCCESS; - } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) { - pCost->loadBlockStatis += 1; - - bool allColumnsHaveAgg = true; - SColumnDataAgg** pColAgg = NULL; - - if (allColumnsHaveAgg == true) { - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - - // todo create this buffer during creating operator - if (pBlock->pBlockAgg == NULL) { - pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES); - } - - for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i); - if (!pColMatchInfo->needOutput) { - continue; - } - pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i]; - } - - return TSDB_CODE_SUCCESS; - } else { // failed to load the block sma data, data block statistics does not exist, load data block instead - *status = FUNC_DATA_REQUIRED_DATA_LOAD; - } - } - - ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); - - // todo filter data block according to the block sma data firstly -#if 0 - if (!doFilterByBlockSMA(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { - pCost->filterOutBlocks += 1; - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, - pBlockInfo->window.ekey, pBlockInfo->rows); - (*status) = FUNC_DATA_REQUIRED_FILTEROUT; - return TSDB_CODE_SUCCESS; - } -#endif - - pCost->totalCheckedRows += pBlock->info.rows; - pCost->loadBlocks += 1; - - STsdbReader* reader = pTableScanInfo->pReader; - SArray* pCols = tsdbRetrieveDataBlock(reader, NULL); - if (pCols == NULL) { - return terrno; - } - - relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); - - // currently only the tbname pseudo column - SExprSupp* pSup = &pTableScanInfo->pseudoSup; - - int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, - pBlock->info.rows, GET_TASKID(pTaskInfo), NULL); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } - - if (pOperator->exprSupp.pFilterInfo != NULL) { - int64_t st = taosGetTimestampMs(); - doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo); - - double el = (taosGetTimestampUs() - st) / 1000.0; - pTableScanInfo->readRecorder.filterTime += el; - - if (pBlock->info.rows == 0) { - pCost->filterOutBlocks += 1; - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms", - GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el); - } else { - qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el); - } - } - - return TSDB_CODE_SUCCESS; -} - static SSDataBlock* getTableDataBlockImpl(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; @@ -4512,15 +4393,15 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { int64_t st = taosGetTimestampUs(); - void* p = tableListGetInfo(pInfo->tableListInfo, readIdx + pInfo->tableStartIndex); - SReadHandle* pHandle = &pInfo->readHandle; + void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); + SReadHandle* pHandle = &pInfo->base.readHandle; - int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo)); + int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->base.dataReader, GET_TASKID(pTaskInfo)); if (code != 0) { T_LONG_JMP(pTaskInfo->env, code); } - STsdbReader* reader = pInfo->pReader; + STsdbReader* reader = pInfo->base.dataReader; while (tsdbNextDataBlock(reader)) { if (isTaskKilled(pTaskInfo)) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); @@ -4546,7 +4427,8 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { } uint32_t status = 0; - code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); + loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); +// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -4559,15 +4441,15 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); pOperator->resultInfo.totalRows += pBlock->info.rows; - pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - tsdbReaderClose(pInfo->pReader); - pInfo->pReader = NULL; + tsdbReaderClose(pInfo->base.dataReader); + pInfo->base.dataReader = NULL; return pBlock; } - tsdbReaderClose(pInfo->pReader); - pInfo->pReader = NULL; + tsdbReaderClose(pInfo->base.dataReader); + pInfo->base.dataReader = NULL; return NULL; } @@ -4605,10 +4487,10 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; { - size_t numOfTables = tableListGetSize(pInfo->tableListInfo); + size_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); int32_t i = pInfo->tableStartIndex + 1; for (; i < numOfTables; ++i) { - STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->tableListInfo, i); + STableKeyInfo* tableKeyInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i); if (tableKeyInfo->groupId != pInfo->groupId) { break; } @@ -4619,7 +4501,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableEndIdx = pInfo->tableEndIndex; - pInfo->pReader = NULL; + pInfo->base.dataReader = NULL; // todo the total available buffer should be determined by total capacity of buffer of this task. // the additional one is reserved for merge result @@ -4642,7 +4524,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { taosArrayPush(pInfo->sortSourceParams, ¶m); SQueryTableDataCond cond; - dumpSQueryTableCond(&pInfo->cond, &cond); + dumpSQueryTableCond(&pInfo->base.cond, &cond); taosArrayPush(pInfo->queryConds, &cond); } @@ -4733,7 +4615,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } - size_t tableListSize = tableListGetSize(pInfo->tableListInfo); + size_t tableListSize = tableListGetSize(pTaskInfo->pTableInfoList); if (!pInfo->hasGroupId) { pInfo->hasGroupId = true; @@ -4742,7 +4624,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { return NULL; } pInfo->tableStartIndex = 0; - pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->tableListInfo, pInfo->tableStartIndex))->groupId; + pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex))->groupId; startGroupTableMergeScan(pOperator); } @@ -4761,7 +4643,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { break; } pInfo->tableStartIndex = pInfo->tableEndIndex + 1; - pInfo->groupId = tableListGetInfo(pInfo->tableListInfo, pInfo->tableStartIndex)->groupId; + pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId; startGroupTableMergeScan(pOperator); } } @@ -4771,7 +4653,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { void destroyTableMergeScanOperatorInfo(void* param) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; - cleanupQueryTableDataCond(&pTableScanInfo->cond); + cleanupQueryTableDataCond(&pTableScanInfo->base.cond); int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds); @@ -4784,8 +4666,8 @@ void destroyTableMergeScanOperatorInfo(void* param) { tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL; - tsdbReaderClose(pTableScanInfo->pReader); - pTableScanInfo->pReader = NULL; + tsdbReaderClose(pTableScanInfo->base.dataReader); + pTableScanInfo->base.dataReader = NULL; for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) { SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i); @@ -4793,17 +4675,20 @@ void destroyTableMergeScanOperatorInfo(void* param) { } taosArrayDestroy(pTableScanInfo->queryConds); - if (pTableScanInfo->matchInfo.pList != NULL) { - taosArrayDestroy(pTableScanInfo->matchInfo.pList); + if (pTableScanInfo->base.matchInfo.pList != NULL) { + taosArrayDestroy(pTableScanInfo->base.matchInfo.pList); } pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); taosArrayDestroy(pTableScanInfo->pSortInfo); - cleanupExprSupp(&pTableScanInfo->pseudoSup); + cleanupExprSupp(&pTableScanInfo->base.pseudoSup); + + tsdbReaderClose(pTableScanInfo->base.dataReader); + pTableScanInfo->base.dataReader = NULL; + taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache); - taosMemoryFreeClear(pTableScanInfo->rowEntryInfoOffset); taosMemoryFreeClear(param); } @@ -4812,7 +4697,7 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla // TODO: merge these two info into one struct STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo)); STableMergeScanInfo* pInfo = pOptr->info; - execInfo->blockRecorder = pInfo->readRecorder; + execInfo->blockRecorder = pInfo->base.readRecorder; execInfo->sortExecInfo = pInfo->sortExecInfo; *pOptrExplain = execInfo; @@ -4821,8 +4706,8 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla return TSDB_CODE_SUCCESS; } -SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo, - SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, + SExecTaskInfo* pTaskInfo) { STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -4833,38 +4718,46 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN int32_t numOfCols = 0; int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, - &pInfo->matchInfo); + &pInfo->base.matchInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } - code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); + code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pInfo->matchInfo.pList); + taosArrayDestroy(pInfo->base.matchInfo.pList); goto _error; } if (pTableScanNode->scan.pScanPseudoCols != NULL) { - SExprSupp* pSup = &pInfo->pseudoSup; + SExprSupp* pSup = &pInfo->base.pseudoSup; pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs); pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset); } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; - pInfo->readHandle = *readHandle; - pInfo->interval = extractIntervalInfo(pTableScanNode); + pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); + if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) { + code = terrno; + goto _error; + } + + pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD; + pInfo->base.scanFlag = MAIN_SCAN; + pInfo->base.readHandle = *readHandle; + + pInfo->base.limitInfo.limit.limit = -1; + pInfo->base.limitInfo.slimit.limit = -1; + pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); - pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { goto _error; } - pInfo->tableListInfo = pTableListInfo; - pInfo->scanFlag = MAIN_SCAN; initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->pResBlock = createResDataBlock(pDescNode); @@ -4872,7 +4765,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam)); - pInfo->pSortInfo = generateSortByTsInfo(pInfo->matchInfo.pList, pInfo->cond.order); + pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d685324b6b..0573450be2 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2547,8 +2547,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; - pScanInfo->cond.twindows = pInfo->win; - pScanInfo->cond.type = TIMEWINDOW_RANGE_EXTERNAL; + pScanInfo->base.cond.twindows = pInfo->win; + pScanInfo->base.cond.type = TIMEWINDOW_RANGE_EXTERNAL; } setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo,