diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 2523b87cfb..8b0bef5fa6 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -217,6 +217,12 @@ enum { TABLE_SCAN__BLOCK_ORDER = 2, }; +typedef enum ETableCountState { + TABLE_COUNT_STATE_NONE = 0, // before start scan + TABLE_COUNT_STATE_SCAN = 1, // scanning + TABLE_COUNT_STATE_END = 2, // finish or noneed to process +} ETableCountState; + typedef struct SAggSupporter { SSHashObj* pResultRowHashTable; // quick locate the window object for each result char* keyBuf; // window key buffer @@ -262,17 +268,18 @@ typedef struct STableScanInfo { int32_t scanTimes; SSDataBlock* pResBlock; SHashObj* pIgnoreTables; - SHashObj* pValuedTables; // non empty table uids + SHashObj* pRemainTables; // remain table to process SSampleExecInfo sample; // sample execution info int32_t currentGroupId; int32_t currentTable; int8_t scanMode; int8_t assignBlockUid; + uint8_t countState; // empty table count state + bool isOneGroup; // whether or not only one group in this scan bool hasGroupByTag; bool countOnly; bool filesetDelimited; bool needCountEmptyTable; - bool processingEmptyTable; } STableScanInfo; typedef struct STableMergeScanInfo { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c69d963c79..31fe31007c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -655,26 +655,44 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, colDataDestroy(&infoData); } - -// record processed (non empty) table -static int32_t markTableProcessed(STableScanInfo* pTableScanInfo, uint64_t uid) { +static int32_t initRemainTable(STableScanInfo* pTableScanInfo, const STableKeyInfo* pList, int32_t num) { if (!pTableScanInfo->needCountEmptyTable) { return TSDB_CODE_SUCCESS; } - if (NULL == pTableScanInfo->pValuedTables) { + pTableScanInfo->isOneGroup = true; + if (NULL == pTableScanInfo->pRemainTables) { int32_t tableNum = taosArrayGetSize(pTableScanInfo->base.pTableListInfo->pTableList); - pTableScanInfo->pValuedTables = + pTableScanInfo->pRemainTables = taosHashInit(tableNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - if (NULL == pTableScanInfo->pValuedTables) { + if (NULL == pTableScanInfo->pRemainTables) { + pTableScanInfo->countState = TABLE_COUNT_STATE_END; return TSDB_CODE_OUT_OF_MEMORY; } } - - taosHashPut(pTableScanInfo->pValuedTables, &uid, sizeof(uid), &pTableScanInfo->scanTimes, - sizeof(pTableScanInfo->scanTimes)); + uint64_t groupId = 0; + for (int32_t i = 0; i < num; i++) { + const STableKeyInfo* pInfo = pList + i; + if (pTableScanInfo->isOneGroup) { + if (i == 0) { + groupId = pInfo->groupId; + } else if (groupId != pInfo->groupId) { + pTableScanInfo->isOneGroup = false; + } + } + taosHashPut(pTableScanInfo->pRemainTables, &(pInfo->uid), sizeof(pInfo->uid), &(pInfo->groupId), sizeof(pInfo->groupId)); + } return TSDB_CODE_SUCCESS; } +static void markTableProcessed(STableScanInfo* pTableScanInfo, uint64_t uid) { + // case0 group scanning, mark + // case1 stream scan: no need to mark + if (pTableScanInfo->countState > TABLE_COUNT_STATE_SCAN) { + return; + } + taosHashRemove(pTableScanInfo->pRemainTables, &uid, sizeof(uid)); +} + static SSDataBlock* getBlockForEmptyTable(SOperatorInfo* pOperator, const STableKeyInfo* tbInfo) { STableScanInfo* pTableScanInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -770,14 +788,17 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKey STableScanInfo* pTableScanInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - // Only when all tables are scanned can you determine how many groups the tag has - bool outputAll = true; // The read handle is not initialized yet, since no qualified tables exists if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) { return NULL; } + if (TABLE_COUNT_STATE_NONE == pTableScanInfo->countState) { + initRemainTable(pTableScanInfo, pList, num); + pTableScanInfo->countState = TABLE_COUNT_STATE_SCAN; + } + // do the ascending order traverse in the first place. while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { SSDataBlock* p = doTableScanImpl(pOperator); @@ -829,37 +850,28 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKey } if (pTableScanInfo->needCountEmptyTable) { - // pList is NULL in mode TABLE_SCAN__TABLE_ORDER for streamscan, no need to process - // pList not NULL, group by group, num >= 1 - int32_t tb_cnt = taosHashGetSize(pTableScanInfo->pValuedTables); - if (pList && num > tb_cnt) { - if (!pTableScanInfo->processingEmptyTable) { - pTableScanInfo->processingEmptyTable = true; - pTableScanInfo->currentTable = 0; - } - if (pTableScanInfo->currentTable < num) { - if (outputAll) { - // loop: get empty table uid & process - while (pTableScanInfo->currentTable < num) { - const STableKeyInfo* info = pList + pTableScanInfo->currentTable++; - if (pTableScanInfo->pValuedTables && - NULL != taosHashGet(pTableScanInfo->pValuedTables, &info->uid, sizeof(info->uid))) { - } else { - return getBlockForEmptyTable(pOperator, info); - } - } - } else if (tb_cnt == 0) { - // only need one & all empty table in this group - // output first one - pTableScanInfo->currentTable = num; - return getBlockForEmptyTable(pOperator, pList); + int32_t tb_cnt = taosHashGetSize(pTableScanInfo->pRemainTables); + if (tb_cnt) { + if (!pTableScanInfo->isOneGroup) { + // get first empty table uid, mark processed & rm from hash + void *pIte = taosHashIterate(pTableScanInfo->pRemainTables, NULL); + if (pIte != NULL) { + size_t keySize = 0; + uint64_t* pUid = taosHashGetKey(pIte, &keySize); + STableKeyInfo info = {.uid = *pUid, .groupId = *(uint64_t*)pIte}; + taosHashCancelIterate(pTableScanInfo->pRemainTables, pIte); + markTableProcessed(pTableScanInfo, *pUid); + return getBlockForEmptyTable(pOperator, &info); } + } else { + // output one table for this group + taosHashClear(pTableScanInfo->pRemainTables); + return getBlockForEmptyTable(pOperator, pList); } } - pTableScanInfo->processingEmptyTable = false; + pTableScanInfo->countState = TABLE_COUNT_STATE_END; } - taosHashClear(pTableScanInfo->pValuedTables); return NULL; } @@ -937,7 +949,8 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { int32_t num = 0; STableKeyInfo* pList = NULL; tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); - + pInfo->countState = TABLE_COUNT_STATE_NONE; + pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, pList, num); pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; @@ -968,6 +981,7 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { } tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); + pInfo->countState = TABLE_COUNT_STATE_NONE; ASSERT(pInfo->base.dataReader == NULL); int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, @@ -1034,6 +1048,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { int32_t numOfTables = 0; // tableListGetSize(pTaskInfo->pTableListInfo); STableKeyInfo tInfo = {0}; + pInfo->countState = TABLE_COUNT_STATE_END; while (1) { SSDataBlock* result = doGroupedTableScan(pOperator, NULL, 0); @@ -1096,7 +1111,7 @@ static void destroyTableScanOperatorInfo(void* param) { STableScanInfo* pTableScanInfo = (STableScanInfo*)param; blockDataDestroy(pTableScanInfo->pResBlock); taosHashCleanup(pTableScanInfo->pIgnoreTables); - taosHashCleanup(pTableScanInfo->pValuedTables); + taosHashCleanup(pTableScanInfo->pRemainTables); destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI); taosMemoryFreeClear(param); } @@ -1161,7 +1176,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pOperator->exprSupp.numOfExprs = numOfCols; pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable; - pInfo->processingEmptyTable = false; pInfo->base.pTableListInfo = pTableListInfo; pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);