From 85ba9a4a5fe2332dd3fc889ee71e8f4e0b96fc20 Mon Sep 17 00:00:00 2001 From: Bob Liu Date: Tue, 2 Jan 2024 15:45:03 +0800 Subject: [PATCH] remainGroups --- source/libs/executor/inc/executil.h | 1 + source/libs/executor/inc/executorInt.h | 7 +- source/libs/executor/src/executil.c | 103 +++++++----------------- source/libs/executor/src/scanoperator.c | 51 ++++++++---- 4 files changed, 72 insertions(+), 90 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 946d3311a4..640ed2f2f9 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -105,6 +105,7 @@ typedef struct STableListInfo { int32_t* groupOffset; // keep the offset value for each group in the tableList SArray* pTableList; SHashObj* map; // speedup acquire the tableQueryInfo by table uid + SHashObj* remainGroups; // remaining group has not yet processed the empty group STableListIdInfo idInfo; // this maybe the super table or ordinary table } STableListInfo; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c09f615735..bee378effc 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -218,9 +218,10 @@ enum { }; typedef enum ETableCountState { - TABLE_COUNT_STATE_NONE = 0, // before start scan - TABLE_COUNT_STATE_SCAN = 1, // scanning - TABLE_COUNT_STATE_END = 2, // finish or noneed to process + TABLE_COUNT_STATE_NONE = 0, // before start scan + TABLE_COUNT_STATE_SCAN = 1, // cur group scanning + TABLE_COUNT_STATE_PROCESSED = 2, // cur group processed + TABLE_COUNT_STATE_END = 3, // finish or noneed to process } ETableCountState; typedef struct SAggSupporter { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index f3e2e99332..fe3528cce6 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -456,7 +456,7 @@ static void genTbGroupDigest(const SNode* pGroup, uint8_t* filterDigest, T_MD5_C } int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo, uint8_t* digest, - SStorageAPI* pAPI) { + SStorageAPI* pAPI, bool initRemainGroups) { int32_t code = TSDB_CODE_SUCCESS; SArray* pBlockList = NULL; SSDataBlock* pResBlock = NULL; @@ -590,6 +590,15 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf goto end; } + if (initRemainGroups) { + pTableListInfo->remainGroups = + taosHashInit(rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + if (pTableListInfo->remainGroups == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + } + for (int i = 0; i < rows; i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); @@ -631,6 +640,14 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf int32_t len = (int32_t)(pStart - (char*)keyBuf); info->groupId = calcGroupId(keyBuf, len); + if (initRemainGroups) { + // groupId ~ table uid + taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid), sizeof(info->uid)); + } + } + + if (initRemainGroups) { + pTableListInfo->numOfOuputGroups = taosHashGetSize(pTableListInfo->remainGroups); } if (tsTagFilterCache) { @@ -2025,6 +2042,7 @@ STableListInfo* tableListCreate() { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pListInfo->remainGroups = NULL; pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); if (pListInfo->pTableList == NULL) { @@ -2054,7 +2072,7 @@ void* tableListDestroy(STableListInfo* pTableListInfo) { taosMemoryFreeClear(pTableListInfo->groupOffset); taosHashCleanup(pTableListInfo->map); - + taosHashCleanup(pTableListInfo->remainGroups); pTableListInfo->pTableList = NULL; pTableListInfo->map = NULL; taosMemoryFree(pTableListInfo); @@ -2068,6 +2086,7 @@ void tableListClear(STableListInfo* pTableListInfo) { taosArrayClear(pTableListInfo->pTableList); taosHashClear(pTableListInfo->map); + taosHashClear(pTableListInfo->remainGroups); taosMemoryFree(pTableListInfo->groupOffset); pTableListInfo->numOfOuputGroups = 1; pTableListInfo->oneTableForEachGroup = false; @@ -2116,70 +2135,6 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) { return TSDB_CODE_SUCCESS; } -static int32_t arrangeTableGroup(STableListInfo* pTableListInfo) { - int32_t code = TSDB_CODE_SUCCESS; - size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); - SArray* pDup = NULL; - SHashObj* pHashObj = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - if (!pHashObj) { - return TSDB_CODE_OUT_OF_MEMORY; - } - int32_t num = 1; - // first: get the number of tables per group - for (int32_t i = 0; i < numOfTables; ++i) { - STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, i); - int32_t* pVal = taosHashGet(pHashObj, &pInfo->groupId, sizeof(pInfo->groupId)); - // update each group's table count - if (!pVal) { - taosHashPut(pHashObj, &pInfo->groupId, sizeof(pInfo->groupId), &num, sizeof(num)); - } else { - (*pVal)++; - } - } - pTableListInfo->numOfOuputGroups = taosHashGetSize(pHashObj); - pTableListInfo->oneTableForEachGroup = (pTableListInfo->numOfOuputGroups == numOfTables); - - if (pTableListInfo->numOfOuputGroups > 1 && pTableListInfo->numOfOuputGroups < numOfTables) { - pDup = taosArrayDup(pTableListInfo->pTableList, NULL); - pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups); - if (pDup == NULL || pTableListInfo->groupOffset == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - int32_t idx = 0, sum = 0; - - void* pIter = taosHashIterate(pHashObj, NULL); - while (pIter != NULL) { - // record each group's offset - pTableListInfo->groupOffset[idx] = sum; - int32_t* pData = (int32_t*)pIter; - sum += *pData; - // change value to record group item's first position - *pData = pTableListInfo->groupOffset[idx++]; - pIter = taosHashIterate(pHashObj, pIter); - } - - // second: arrange the tables and put the items with same groupId together - STableKeyInfo* pStart = taosArrayGet(pTableListInfo->pTableList, 0); - for (int32_t i = 0; i < numOfTables; ++i) { - STableKeyInfo* pInfo = taosArrayGet(pDup, i); - int32_t* pVal = taosHashGet(pHashObj, &pInfo->groupId, sizeof(pInfo->groupId)); - if (*pVal != i) { - pStart[*pVal] = *pInfo; - } - (*pVal)++; // update to next item's position - } - } - -_exit: - taosHashCleanup(pHashObj); - taosArrayDestroy(pDup); - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFreeClear(pTableListInfo->groupOffset); - } - return code; -} - int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode, SNodeList* group, bool groupSort, uint8_t* digest, SStorageAPI* pAPI) { int32_t code = TSDB_CODE_SUCCESS; @@ -2206,19 +2161,21 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pTableListInfo->numOfOuputGroups = 1; } } else { - code = getColInfoResultForGroupby(pHandle->vnode, group, pTableListInfo, digest, pAPI); + bool initRemainGroups = false; + if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) { + STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode; + if (pTableScanNode->needCountEmptyTable) { + initRemainGroups = true; + } + } + + code = getColInfoResultForGroupby(pHandle->vnode, group, pTableListInfo, digest, pAPI, initRemainGroups); if (code != TSDB_CODE_SUCCESS) { return code; } if (groupSort || pScanNode->groupOrderScan) { code = sortTableGroup(pTableListInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) { - STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode; - if (pTableScanNode->needCountEmptyTable) { - // only put together tables with the same groupid - arrangeTableGroup(pTableListInfo); - } } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 27c9a76757..8999340ddc 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -673,14 +673,7 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i pInfo->tableEndIndex = numOfTables - 1; } } else { - int32_t i = pInfo->tableStartIndex + 1; - for (; i < numOfTables; ++i) { - STableKeyInfo* pCur = tableListGetInfo(pTableListInfo, i); - if (pCur->groupId != pStart->groupId) { - break; - } - } - pInfo->tableEndIndex = i - 1; + pInfo->tableEndIndex = numOfTables - 1; } if (!pInfo->needCountEmptyTable) { @@ -693,6 +686,17 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i *size = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; } +void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) { + if (pInfo->countState == TABLE_COUNT_STATE_END) { + return; + } + if (pInfo->base.pTableListInfo->oneTableForEachGroup) { + pInfo->countState = TABLE_COUNT_STATE_PROCESSED; + } else { + taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId)); + } +} + static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBase* pBase, SSDataBlock* pBlock, const STableKeyInfo* tbInfo) { blockDataEmpty(pBlock); @@ -802,7 +806,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { SSDataBlock* p = doTableScanImpl(pOperator); if (p != NULL) { - pTableScanInfo->countState = TABLE_COUNT_STATE_END; + markGroupProcessed(pTableScanInfo, p->info.id.groupId); return p; } @@ -831,7 +835,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { while (pTableScanInfo->scanTimes < total) { SSDataBlock* p = doTableScanImpl(pOperator); if (p != NULL) { - pTableScanInfo->countState = TABLE_COUNT_STATE_END; + markGroupProcessed(pTableScanInfo, p->info.id.groupId); return p; } @@ -849,11 +853,30 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { } if (pTableScanInfo->countState < TABLE_COUNT_STATE_END) { - // output once for this group + STableListInfo* pTableListInfo = pTableScanInfo->base.pTableListInfo; + if (pTableListInfo->oneTableForEachGroup) { // group by tbname + if (pTableScanInfo->countState < TABLE_COUNT_STATE_PROCESSED) { + pTableScanInfo->countState = TABLE_COUNT_STATE_PROCESSED; + STableKeyInfo* pStart = + (STableKeyInfo*)tableListGetInfo(pTableScanInfo->base.pTableListInfo, pTableScanInfo->tableStartIndex); + return getBlockForEmptyTable(pOperator, pStart); + } + } else { // group by tag + int32_t numOfTables = tableListGetSize(pTableListInfo); + if (pTableScanInfo->tableEndIndex + 1 >= numOfTables) { + // get empty group, mark processed & rm from hash + void* pIte = taosHashIterate(pTableListInfo->remainGroups, NULL); + if (pIte != NULL) { + size_t keySize = 0; + uint64_t* pGroupId = taosHashGetKey(pIte, &keySize); + STableKeyInfo info = {.uid = *(uint64_t*)pIte, .groupId = *pGroupId}; + taosHashCancelIterate(pTableListInfo->remainGroups, pIte); + markGroupProcessed(pTableScanInfo, *pGroupId); + return getBlockForEmptyTable(pOperator, &info); + } + } + } pTableScanInfo->countState = TABLE_COUNT_STATE_END; - STableKeyInfo* pStart = - (STableKeyInfo*)tableListGetInfo(pTableScanInfo->base.pTableListInfo, pTableScanInfo->tableStartIndex); - return getBlockForEmptyTable(pOperator, pStart); } return NULL;