From e2b61a55fc13aee349cb63abb3c7d003c1de783f Mon Sep 17 00:00:00 2001 From: Bob Liu Date: Fri, 29 Dec 2023 01:40:17 +0800 Subject: [PATCH] refactor --- source/libs/executor/inc/executorInt.h | 6 +- source/libs/executor/src/executor.c | 12 +-- source/libs/executor/src/scanoperator.c | 129 ++++++++---------------- 3 files changed, 52 insertions(+), 95 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index ae38d4940c..9acef69f9c 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -268,14 +268,12 @@ typedef struct STableScanInfo { int32_t scanTimes; SSDataBlock* pResBlock; SHashObj* pIgnoreTables; - SHashObj* pRemainTables; // remain table to process SSampleExecInfo sample; // sample execution info - int32_t currentGroupId; - int32_t currentTable; + int32_t tableStartIndex; // current group scan start + int32_t tableEndIndex; // current group scan end int8_t scanMode; int8_t assignBlockUid; uint8_t countState; // empty table count state - bool isSameGroup; // whether all tables are in the same group this scan bool hasGroupByTag; bool countOnly; bool filesetDelimited; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6cee79bff2..fb39de484f 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1209,7 +1209,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0); uid = pTableInfo->uid; ts = INT64_MIN; - pScanInfo->currentTable = 0; + pScanInfo->tableEndIndex = 0; } else { taosRUnLockLatch(&pTaskInfo->lock); qError("no table in table list, %s", id); @@ -1223,16 +1223,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pInfo->pTableScanOp->resultInfo.totalRows = 0; // start from current accessed position - // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start + // we cannot start from the pScanInfo->tableEndIndex, since the commit offset may cause the rollback of the start // position, let's find it from the beginning. index = tableListFind(pTableListInfo, uid, 0); taosRUnLockLatch(&pTaskInfo->lock); if (index >= 0) { - pScanInfo->currentTable = index; + pScanInfo->tableEndIndex = index; } else { qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid, - numOfTables, pScanInfo->currentTable, id); + numOfTables, pScanInfo->tableEndIndex, id); terrno = TSDB_CODE_PAR_INTERNAL_ERROR; return -1; } @@ -1255,12 +1255,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s", - uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); + uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->tableEndIndex, numOfTables, id); } else { pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1); pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond); qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s", - uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); + uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->tableEndIndex, numOfTables, id); } // restore the key value diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7b2df4e206..e990d3d975 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -655,45 +655,29 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, colDataDestroy(&infoData); } -static int32_t initTableCountEnv(STableScanInfo* pTableScanInfo, const STableKeyInfo* pList, int32_t num) { - if (!pTableScanInfo->needCountEmptyTable) { - pTableScanInfo->countState = TABLE_COUNT_STATE_END; - return TSDB_CODE_SUCCESS; - } - pTableScanInfo->isSameGroup = true; - if (NULL == pTableScanInfo->pRemainTables) { - int32_t tableNum = taosArrayGetSize(pTableScanInfo->base.pTableListInfo->pTableList); - pTableScanInfo->pRemainTables = - taosHashInit(tableNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - if (NULL == pTableScanInfo->pRemainTables) { - pTableScanInfo->countState = TABLE_COUNT_STATE_END; - return TSDB_CODE_OUT_OF_MEMORY; - } - } - uint64_t groupId = pList->groupId; - for (int32_t i = 0; i < num; i++) { - const STableKeyInfo* pInfo = pList + i; - if (pTableScanInfo->isSameGroup && groupId != pInfo->groupId) { - pTableScanInfo->isSameGroup = false; - } - taosHashPut(pTableScanInfo->pRemainTables, &(pInfo->uid), sizeof(pInfo->uid), &(pInfo->groupId), sizeof(pInfo->groupId)); - } - pTableScanInfo->countState = TABLE_COUNT_STATE_SCAN; - return TSDB_CODE_SUCCESS; -} -static void markTableProcessed(STableScanInfo* pTableScanInfo, uint64_t uid) { - // case0 group scanning, mark - // case1 stream scan: no need to mark - if (pTableScanInfo->countState > TABLE_COUNT_STATE_SCAN) { - return; +static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, int32_t* size) { + pInfo->tableStartIndex = pInfo->tableEndIndex + 1; + + int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); + STableKeyInfo* pStart = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + int32_t i = pInfo->tableStartIndex + 1; + for (; i < numOfTables; ++i) { + STableKeyInfo* pCur = tableListGetInfo(pInfo->base.pTableListInfo, i); + if (pCur->groupId != pStart->groupId) { + break; + } } - // case2 if all table in same group, process only once - if (pTableScanInfo->isSameGroup) { - pTableScanInfo->countState = TABLE_COUNT_STATE_END; - return; + + pInfo->tableEndIndex = i - 1; + if (!pInfo->needCountEmptyTable) { + pInfo->countState = TABLE_COUNT_STATE_END; + } else { + pInfo->countState = TABLE_COUNT_STATE_SCAN; } - taosHashRemove(pTableScanInfo->pRemainTables, &uid, sizeof(uid)); + + *pKeyInfo = pStart; + *size = i - pInfo->tableStartIndex; } static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBase* pBase, SSDataBlock* pBlock, @@ -791,7 +775,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { return NULL; } -static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKeyInfo* pList, int32_t num) { +static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { STableScanInfo* pTableScanInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; @@ -801,15 +785,11 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKey return NULL; } - if (TABLE_COUNT_STATE_NONE == pTableScanInfo->countState) { - initTableCountEnv(pTableScanInfo, pList, num); - } - // do the ascending order traverse in the first place. while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { SSDataBlock* p = doTableScanImpl(pOperator); if (p != NULL) { - markTableProcessed(pTableScanInfo, p->info.id.uid); + pTableScanInfo->countState = TABLE_COUNT_STATE_END; return p; } @@ -838,7 +818,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKey while (pTableScanInfo->scanTimes < total) { SSDataBlock* p = doTableScanImpl(pOperator); if (p != NULL) { - markTableProcessed(pTableScanInfo, p->info.id.uid); + pTableScanInfo->countState = TABLE_COUNT_STATE_END; return p; } @@ -856,29 +836,13 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKey } if (pTableScanInfo->countState < TABLE_COUNT_STATE_END) { - int32_t tb_cnt = taosHashGetSize(pTableScanInfo->pRemainTables); - if (tb_cnt) { - if (!pTableScanInfo->isSameGroup) { - // get first empty table uid, mark processed & rm from hash - void *pIte = taosHashIterate(pTableScanInfo->pRemainTables, NULL); - if (pIte != NULL) { - size_t keySize = 0; - uint64_t* pUid = taosHashGetKey(pIte, &keySize); - STableKeyInfo info = {.uid = *pUid, .groupId = *(uint64_t*)pIte}; - taosHashCancelIterate(pTableScanInfo->pRemainTables, pIte); - markTableProcessed(pTableScanInfo, *pUid); - return getBlockForEmptyTable(pOperator, &info); - } - } else { - // output one table for this group - pTableScanInfo->countState = TABLE_COUNT_STATE_END; - return getBlockForEmptyTable(pOperator, pList); - } - } + // output once for this group pTableScanInfo->countState = TABLE_COUNT_STATE_END; + STableKeyInfo* pStart = + (STableKeyInfo*)tableListGetInfo(pTableScanInfo->base.pTableListInfo, pTableScanInfo->tableStartIndex); + return getBlockForEmptyTable(pOperator, pStart); } - taosHashClear(pTableScanInfo->pRemainTables); return NULL; } @@ -938,8 +902,8 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - - if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { + int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); + if (pInfo->tableEndIndex + 1 >= numOfTables) { setOperatorCompleted(pOperator); if (pOperator->dynamicTask) { taosArrayClear(pInfo->base.pTableListInfo->pTableList); @@ -954,14 +918,13 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { int32_t num = 0; STableKeyInfo* pList = NULL; - tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); - pInfo->countState = TABLE_COUNT_STATE_NONE; + initNextGroupScan(pInfo, &pList, &num); pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, pList, num); pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; - SSDataBlock* result = doGroupedTableScan(pOperator, pList, num); + SSDataBlock* result = doGroupedTableScan(pOperator); if (result != NULL) { if (pOperator->dynamicTask) { result->info.id.groupId = result->info.id.uid; @@ -979,15 +942,14 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { int32_t num = 0; STableKeyInfo* pList = NULL; - if (pInfo->currentGroupId == -1) { + if (pInfo->tableEndIndex == -1) { int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); - if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo) || numOfTables == 0) { + if (pInfo->tableEndIndex + 1 == numOfTables) { setOperatorCompleted(pOperator); return NULL; } - tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); - pInfo->countState = TABLE_COUNT_STATE_NONE; + initNextGroupScan(pInfo, &pList, &num); ASSERT(pInfo->base.dataReader == NULL); int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, @@ -1001,11 +963,9 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) { pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity; } - } else { - tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); } - SSDataBlock* result = doGroupedTableScan(pOperator, pList, num); + SSDataBlock* result = doGroupedTableScan(pOperator); if (result != NULL) { if (pOperator->dynamicTask) { result->info.id.groupId = result->info.id.uid; @@ -1038,7 +998,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } if (pOperator->status == OP_EXEC_DONE) { - pInfo->currentGroupId = -1; + pInfo->tableEndIndex = -1; pOperator->status = OP_OPENED; SSDataBlock* result = NULL; while (true) { @@ -1057,29 +1017,29 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { pInfo->countState = TABLE_COUNT_STATE_END; while (1) { - SSDataBlock* result = doGroupedTableScan(pOperator, NULL, 0); + SSDataBlock* result = doGroupedTableScan(pOperator); if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) { return result; } // if no data, switch to next table and continue scan - pInfo->currentTable++; + pInfo->tableEndIndex++; taosRLockLatch(&pTaskInfo->lock); numOfTables = tableListGetSize(pInfo->base.pTableListInfo); - if (pInfo->currentTable >= numOfTables) { + if (pInfo->tableEndIndex >= numOfTables) { qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo)); taosRUnLockLatch(&pTaskInfo->lock); return NULL; } - tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); + tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableEndIndex); taosRUnLockLatch(&pTaskInfo->lock); pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1); qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables, - pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo)); + pInfo->tableEndIndex, numOfTables, GET_TASKID(pTaskInfo)); pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; @@ -1117,7 +1077,6 @@ static void destroyTableScanOperatorInfo(void* param) { STableScanInfo* pTableScanInfo = (STableScanInfo*)param; blockDataDestroy(pTableScanInfo->pResBlock); taosHashCleanup(pTableScanInfo->pIgnoreTables); - taosHashCleanup(pTableScanInfo->pRemainTables); destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI); taosMemoryFreeClear(param); } @@ -1173,7 +1132,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, goto _error; } - pInfo->currentGroupId = -1; + pInfo->tableEndIndex = -1; pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false; @@ -1268,7 +1227,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint6 pTableScanInfo->base.cond.startVersion = 0; pTableScanInfo->base.cond.endVersion = ver; pTableScanInfo->scanTimes = 0; - pTableScanInfo->currentGroupId = -1; + pTableScanInfo->tableEndIndex = -1; pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); pTableScanInfo->base.dataReader = NULL; } @@ -2170,7 +2129,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pInfo->pTableScanOp->status = OP_OPENED; pTSInfo->scanTimes = 0; - pTSInfo->currentGroupId = -1; + pTSInfo->tableEndIndex = -1; } if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {