diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e4f6b42224..17ab756933 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -1077,7 +1077,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); bool groupbyTbname(SNodeList* pGroupList); -int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort); +int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort); void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 877dc5ec27..6b598d7bd3 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -741,7 +741,8 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis int32_t len = (int32_t)(pStart - (char*)keyBuf); info->groupId = calcGroupId(keyBuf, len); - taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); + + taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &i, sizeof(int32_t)); } // int64_t st2 = taosGetTimestampUs(); @@ -1665,29 +1666,29 @@ uint64_t getTotalTables(const STableListInfo* pTableList) { } uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { - if (pTableList->oneTableForEachGroup) { - return tableUid; - } + ASSERT(pTableList->map != NULL); + int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); + ASSERT(slot != NULL); - uint64_t* groupId = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); - if (groupId != NULL) { - return *groupId; - } else { - return 0; - } + STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot); + ASSERT(pKeyInfo->uid == tableUid); + + return pKeyInfo->groupId; } int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) { - STableKeyInfo keyInfo = {.uid = uid, .groupId = gid}; - taosArrayPush(pTableList->pTableList, &keyInfo); - if (!pTableList->oneTableForEachGroup) { - if (pTableList->map == NULL) { - pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - } - - taosHashPut(pTableList->map, &uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); + if (pTableList->map == NULL) { + ASSERT(taosArrayGetSize(pTableList->pTableList) == 0); + pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); } + STableKeyInfo keyInfo = {.uid = uid, .groupId = gid}; + taosArrayPush(pTableList->pTableList, &keyInfo); + + int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1; + taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot)); + + qDebug("uid:%"PRIu64", groupId:%"PRIu64" added into table list, slot:%d, %d", uid, gid, slot, slot + 1); return TSDB_CODE_SUCCESS; } @@ -1724,6 +1725,7 @@ int32_t getNumOfOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; } +// todo remove it bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 7f04623612..765968999a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -360,6 +360,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo if (!exists) { #endif + addTableIntoTableList(pTableListInfo, keyInfo.uid, keyInfo.groupId); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 00fef26670..b5c6c5cf39 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3489,11 +3489,8 @@ bool groupbyTbname(SNodeList* pGroupList) { return bytbname; } -int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) { +int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) { int32_t code = TSDB_CODE_SUCCESS; - if (group == NULL) { - return code; - } pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (pTableListInfo->map == NULL) { @@ -3501,19 +3498,22 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, return code; } - bool assignUid = groupbyTbname(group); + bool groupByTbname = groupbyTbname(group); size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); - - if (assignUid) { // in case of group/partition by tbname, the group id is equalled to the uid of table + if (group == NULL || groupByTbname) { for (int32_t i = 0; i < numOfTables; i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - info->groupId = info->uid; - taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); + info->groupId = groupByTbname? info->uid:0; + + taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &i, sizeof(int32_t)); } - pTableListInfo->oneTableForEachGroup = true; - if (groupSort) { + pTableListInfo->oneTableForEachGroup = groupByTbname; + + if (groupSort && groupByTbname) { pTableListInfo->numOfOuputGroups = numOfTables; + } else { + pTableListInfo->numOfOuputGroups = 1; } } else { code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6935a9ab93..7524a4ea1a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1137,12 +1137,13 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, } static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) { - SHashObj* map = pInfo->pTableScanOp->pTaskInfo->tableqinfoList.map; - uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t)); - if (groupId) { - return *groupId; - } - return 0; + return getTableGroupId(&pInfo->pTableScanOp->pTaskInfo->tableqinfoList, uid); +// SHashObj* map = pInfo->pTableScanOp->pTaskInfo->tableqinfoList.map; +// uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t)); +// if (groupId) { +// return *groupId; +// } +// return 0; } static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { @@ -1561,12 +1562,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.version = pBlock->info.version; - uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); - if (groupIdPre) { - pInfo->pRes->info.groupId = *groupIdPre; - } else { - pInfo->pRes->info.groupId = 0; - } + pInfo->pRes->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); // todo extract method for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { @@ -4236,7 +4232,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags } pTableListInfo->numOfOuputGroups = 1; - code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags, groupSort); + code = setGroupIdMapForAllTables(pTableListInfo, pHandle, pGroupTags, groupSort); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4566,10 +4562,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) { continue; } - uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); - if (groupId) { - pBlock->info.groupId = *groupId; - } + pBlock->info.groupId = getTableGroupId(&pOperator->pTaskInfo->tableqinfoList, pBlock->info.uid); pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -4626,11 +4619,7 @@ static SSDataBlock* getTableDataBlock2(void* param) { continue; } - uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); - if (groupId) { - pBlock->info.groupId = *groupId; - } - + pBlock->info.groupId = getTableGroupId(&pOperator->pTaskInfo->tableqinfoList, pBlock->info.uid); pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -4684,11 +4673,7 @@ static SSDataBlock* getTableDataBlock(void* param) { continue; } - uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); - if (groupId) { - pBlock->info.groupId = *groupId; - } - + pBlock->info.groupId = getTableGroupId(&pOperator->pTaskInfo->tableqinfoList, pBlock->info.uid); pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;