From 875989aa309196218519f16222e8c2653b93dede Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 18 Nov 2022 11:24:56 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/executor/inc/executorimpl.h | 10 ++--- source/libs/executor/src/executorimpl.c | 57 +------------------------ source/libs/executor/src/scanoperator.c | 18 ++++---- 3 files changed, 13 insertions(+), 72 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 03dbca8668..be52a10dcb 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -163,7 +163,7 @@ typedef struct { SArray* pStopInfo; } STaskStopInfo; -typedef struct SExecTaskInfo { +struct SExecTaskInfo { STaskIdInfo id; uint32_t status; STimeWindow window; @@ -182,7 +182,7 @@ typedef struct SExecTaskInfo { struct SOperatorInfo* pRoot; SLocalFetch localFetch; STaskStopInfo stopInfo; -} SExecTaskInfo; +}; enum { OP_NOT_OPENED = 0x0, @@ -343,7 +343,6 @@ typedef struct STableScanInfo { } STableScanInfo; typedef struct STableMergeScanInfo { - STableListInfo* tableListInfo; int32_t tableStartIndex; int32_t tableEndIndex; bool hasGroupId; @@ -363,7 +362,6 @@ typedef struct STableMergeScanInfo { int32_t scanTimes; SSDataBlock* pResBlock; int32_t numOfOutput; - int32_t dataBlockLoadFlag; // if the upstream is an interval operator, the interval info is also kept here to get the time // window to check if current data block needs to be loaded. @@ -1041,8 +1039,8 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo, - SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, + SExecTaskInfo* pTaskInfo); void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c36175cd6a..2b4b274f97 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1000,12 +1000,6 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc return TSDB_CODE_SUCCESS; } -static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) { - if (pTableQueryInfo == NULL) { - return; - } -} - void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { if (status == TASK_NOT_COMPLETED) { pTaskInfo->status = status; @@ -1665,55 +1659,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey); -static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) { - size_t size = taosArrayGetSize(groupInfo); - if (size == 0) { - return true; - } - - for (int32_t i = 0; i < size; ++i) { - int32_t* index = taosArrayGet(groupInfo, i); - - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index); - bool isNull = colDataIsNull(pColInfo, rowIndex, pBlock->info.rows, NULL); - - if ((isNull && buf[i] != NULL) || (!isNull && buf[i] == NULL)) { - return false; - } - - char* pCell = colDataGetData(pColInfo, rowIndex); - if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { - if (varDataLen(pCell) != varDataLen(buf[i])) { - return false; - } else { - if (memcmp(varDataVal(pCell), varDataVal(buf[i]), varDataLen(pCell)) != 0) { - return false; - } - } - } else { - if (memcmp(pCell, buf[i], pColInfo->info.bytes) != 0) { - return false; - } - } - } - - return 0; -} - -static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) { - int32_t size = (int32_t)taosArrayGetSize(pColumnList); - - for (int32_t i = 0; i < size; ++i) { - int32_t* index = taosArrayGet(pColumnList, i); - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index); - - char* data = colDataGetData(pColInfo, rowIndex); - memcpy(rowColData[i], data, colDataGetLength(pColInfo, rowIndex)); - } - - return true; -} - int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) { // todo add more information about exchange operation int32_t type = pOperator->operatorType; @@ -2762,7 +2707,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo); + pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); if (NULL == pOperator) { pTaskInfo->code = terrno; return NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ee1114f330..8e8a86670b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4392,7 +4392,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { int64_t st = taosGetTimestampUs(); - void* p = tableListGetInfo(pInfo->tableListInfo, readIdx + pInfo->tableStartIndex); + void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); SReadHandle* pHandle = &pInfo->base.readHandle; int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->base.dataReader, GET_TASKID(pTaskInfo)); @@ -4486,10 +4486,10 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; { - size_t numOfTables = tableListGetSize(pInfo->tableListInfo); + size_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); int32_t i = pInfo->tableStartIndex + 1; for (; i < numOfTables; ++i) { - STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->tableListInfo, i); + STableKeyInfo* tableKeyInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i); if (tableKeyInfo->groupId != pInfo->groupId) { break; } @@ -4613,7 +4613,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } - size_t tableListSize = tableListGetSize(pInfo->tableListInfo); + size_t tableListSize = tableListGetSize(pTaskInfo->pTableInfoList); if (!pInfo->hasGroupId) { pInfo->hasGroupId = true; @@ -4622,7 +4622,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { return NULL; } pInfo->tableStartIndex = 0; - pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->tableListInfo, pInfo->tableStartIndex))->groupId; + pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex))->groupId; startGroupTableMergeScan(pOperator); } @@ -4641,7 +4641,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { break; } pInfo->tableStartIndex = pInfo->tableEndIndex + 1; - pInfo->groupId = tableListGetInfo(pInfo->tableListInfo, pInfo->tableStartIndex)->groupId; + pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId; startGroupTableMergeScan(pOperator); } } @@ -4698,8 +4698,8 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla return TSDB_CODE_SUCCESS; } -SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo, - SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, + SExecTaskInfo* pTaskInfo) { STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -4733,14 +4733,12 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->interval = extractIntervalInfo(pTableScanNode); pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); - pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { goto _error; } - pInfo->tableListInfo = pTableListInfo; pInfo->base.scanFlag = MAIN_SCAN; initResultSizeInfo(&pOperator->resultInfo, 1024);