arrangeTableGroup
This commit is contained in:
parent
7e1d59c565
commit
673d3bc376
|
@ -2116,6 +2116,70 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t arrangeTableGroup(STableListInfo* pTableListInfo) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
||||||
|
SArray* pDup = NULL;
|
||||||
|
SHashObj* pHashObj = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
|
if (!pHashObj) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
int32_t num = 1;
|
||||||
|
// first: get the number of tables per group
|
||||||
|
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||||
|
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
|
int32_t* pVal = taosHashGet(pHashObj, &pInfo->groupId, sizeof(pInfo->groupId));
|
||||||
|
// update each group's table count
|
||||||
|
if (!pVal) {
|
||||||
|
taosHashPut(pHashObj, &pInfo->groupId, sizeof(pInfo->groupId), &num, sizeof(num));
|
||||||
|
} else {
|
||||||
|
(*pVal)++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pTableListInfo->numOfOuputGroups = taosHashGetSize(pHashObj);
|
||||||
|
pTableListInfo->oneTableForEachGroup = (pTableListInfo->numOfOuputGroups == numOfTables);
|
||||||
|
|
||||||
|
if (pTableListInfo->numOfOuputGroups > 1 && pTableListInfo->numOfOuputGroups < numOfTables) {
|
||||||
|
pDup = taosArrayDup(pTableListInfo->pTableList, NULL);
|
||||||
|
pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
|
||||||
|
if (pDup == NULL || pTableListInfo->groupOffset == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
int32_t idx = 0, sum = 0;
|
||||||
|
|
||||||
|
void* pIter = taosHashIterate(pHashObj, NULL);
|
||||||
|
while (pIter != NULL) {
|
||||||
|
// record each group's offset
|
||||||
|
pTableListInfo->groupOffset[idx] = sum;
|
||||||
|
int32_t* pData = (int32_t*)pIter;
|
||||||
|
sum += *pData;
|
||||||
|
// change value to record group item's first position
|
||||||
|
*pData = pTableListInfo->groupOffset[idx++];
|
||||||
|
pIter = taosHashIterate(pHashObj, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
// second: arrange the tables and put the items with same groupId together
|
||||||
|
STableKeyInfo* pStart = taosArrayGet(pTableListInfo->pTableList, 0);
|
||||||
|
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||||
|
STableKeyInfo* pInfo = taosArrayGet(pDup, i);
|
||||||
|
int32_t* pVal = taosHashGet(pHashObj, &pInfo->groupId, sizeof(pInfo->groupId));
|
||||||
|
if (*pVal != i) {
|
||||||
|
pStart[*pVal] = *pInfo;
|
||||||
|
}
|
||||||
|
(*pVal)++; // update to next item's position
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
taosHashCleanup(pHashObj);
|
||||||
|
taosArrayDestroy(pDup);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosMemoryFreeClear(pTableListInfo->groupOffset);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode,
|
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode,
|
||||||
SNodeList* group, bool groupSort, uint8_t* digest, SStorageAPI* pAPI) {
|
SNodeList* group, bool groupSort, uint8_t* digest, SStorageAPI* pAPI) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -2149,6 +2213,12 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
||||||
|
|
||||||
if (groupSort || pScanNode->groupOrderScan) {
|
if (groupSort || pScanNode->groupOrderScan) {
|
||||||
code = sortTableGroup(pTableListInfo);
|
code = sortTableGroup(pTableListInfo);
|
||||||
|
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
|
||||||
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
|
||||||
|
if (pTableScanNode->needCountEmptyTable) {
|
||||||
|
// only put together tables with the same groupid
|
||||||
|
arrangeTableGroup(pTableListInfo);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -302,7 +302,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
|
||||||
pTableListInfo->idInfo.suid = pTableScanNode->scan.suid;
|
pTableListInfo->idInfo.suid = pTableScanNode->scan.suid;
|
||||||
pTableListInfo->idInfo.tableType = pTableScanNode->scan.tableType;
|
pTableListInfo->idInfo.tableType = pTableScanNode->scan.tableType;
|
||||||
} else {
|
} else {
|
||||||
code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle,
|
code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
|
||||||
pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
|
pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
|
||||||
if (code) {
|
if (code) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
|
|
Loading…
Reference in New Issue