remainGroups
This commit is contained in:
parent
673d3bc376
commit
85ba9a4a5f
|
@ -105,6 +105,7 @@ typedef struct STableListInfo {
|
|||
int32_t* groupOffset; // keep the offset value for each group in the tableList
|
||||
SArray* pTableList;
|
||||
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
||||
SHashObj* remainGroups; // remaining group has not yet processed the empty group
|
||||
STableListIdInfo idInfo; // this maybe the super table or ordinary table
|
||||
} STableListInfo;
|
||||
|
||||
|
|
|
@ -218,9 +218,10 @@ enum {
|
|||
};
|
||||
|
||||
typedef enum ETableCountState {
|
||||
TABLE_COUNT_STATE_NONE = 0, // before start scan
|
||||
TABLE_COUNT_STATE_SCAN = 1, // scanning
|
||||
TABLE_COUNT_STATE_END = 2, // finish or noneed to process
|
||||
TABLE_COUNT_STATE_NONE = 0, // before start scan
|
||||
TABLE_COUNT_STATE_SCAN = 1, // cur group scanning
|
||||
TABLE_COUNT_STATE_PROCESSED = 2, // cur group processed
|
||||
TABLE_COUNT_STATE_END = 3, // finish or noneed to process
|
||||
} ETableCountState;
|
||||
|
||||
typedef struct SAggSupporter {
|
||||
|
|
|
@ -456,7 +456,7 @@ static void genTbGroupDigest(const SNode* pGroup, uint8_t* filterDigest, T_MD5_C
|
|||
}
|
||||
|
||||
int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo, uint8_t* digest,
|
||||
SStorageAPI* pAPI) {
|
||||
SStorageAPI* pAPI, bool initRemainGroups) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SArray* pBlockList = NULL;
|
||||
SSDataBlock* pResBlock = NULL;
|
||||
|
@ -590,6 +590,15 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
|
|||
goto end;
|
||||
}
|
||||
|
||||
if (initRemainGroups) {
|
||||
pTableListInfo->remainGroups =
|
||||
taosHashInit(rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
if (pTableListInfo->remainGroups == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < rows; i++) {
|
||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||
|
||||
|
@ -631,6 +640,14 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
|
|||
|
||||
int32_t len = (int32_t)(pStart - (char*)keyBuf);
|
||||
info->groupId = calcGroupId(keyBuf, len);
|
||||
if (initRemainGroups) {
|
||||
// groupId ~ table uid
|
||||
taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid), sizeof(info->uid));
|
||||
}
|
||||
}
|
||||
|
||||
if (initRemainGroups) {
|
||||
pTableListInfo->numOfOuputGroups = taosHashGetSize(pTableListInfo->remainGroups);
|
||||
}
|
||||
|
||||
if (tsTagFilterCache) {
|
||||
|
@ -2025,6 +2042,7 @@ STableListInfo* tableListCreate() {
|
|||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
pListInfo->remainGroups = NULL;
|
||||
|
||||
pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
|
||||
if (pListInfo->pTableList == NULL) {
|
||||
|
@ -2054,7 +2072,7 @@ void* tableListDestroy(STableListInfo* pTableListInfo) {
|
|||
taosMemoryFreeClear(pTableListInfo->groupOffset);
|
||||
|
||||
taosHashCleanup(pTableListInfo->map);
|
||||
|
||||
taosHashCleanup(pTableListInfo->remainGroups);
|
||||
pTableListInfo->pTableList = NULL;
|
||||
pTableListInfo->map = NULL;
|
||||
taosMemoryFree(pTableListInfo);
|
||||
|
@ -2068,6 +2086,7 @@ void tableListClear(STableListInfo* pTableListInfo) {
|
|||
|
||||
taosArrayClear(pTableListInfo->pTableList);
|
||||
taosHashClear(pTableListInfo->map);
|
||||
taosHashClear(pTableListInfo->remainGroups);
|
||||
taosMemoryFree(pTableListInfo->groupOffset);
|
||||
pTableListInfo->numOfOuputGroups = 1;
|
||||
pTableListInfo->oneTableForEachGroup = false;
|
||||
|
@ -2116,70 +2135,6 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t arrangeTableGroup(STableListInfo* pTableListInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
||||
SArray* pDup = NULL;
|
||||
SHashObj* pHashObj = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
if (!pHashObj) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
int32_t num = 1;
|
||||
// first: get the number of tables per group
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, i);
|
||||
int32_t* pVal = taosHashGet(pHashObj, &pInfo->groupId, sizeof(pInfo->groupId));
|
||||
// update each group's table count
|
||||
if (!pVal) {
|
||||
taosHashPut(pHashObj, &pInfo->groupId, sizeof(pInfo->groupId), &num, sizeof(num));
|
||||
} else {
|
||||
(*pVal)++;
|
||||
}
|
||||
}
|
||||
pTableListInfo->numOfOuputGroups = taosHashGetSize(pHashObj);
|
||||
pTableListInfo->oneTableForEachGroup = (pTableListInfo->numOfOuputGroups == numOfTables);
|
||||
|
||||
if (pTableListInfo->numOfOuputGroups > 1 && pTableListInfo->numOfOuputGroups < numOfTables) {
|
||||
pDup = taosArrayDup(pTableListInfo->pTableList, NULL);
|
||||
pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
|
||||
if (pDup == NULL || pTableListInfo->groupOffset == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
int32_t idx = 0, sum = 0;
|
||||
|
||||
void* pIter = taosHashIterate(pHashObj, NULL);
|
||||
while (pIter != NULL) {
|
||||
// record each group's offset
|
||||
pTableListInfo->groupOffset[idx] = sum;
|
||||
int32_t* pData = (int32_t*)pIter;
|
||||
sum += *pData;
|
||||
// change value to record group item's first position
|
||||
*pData = pTableListInfo->groupOffset[idx++];
|
||||
pIter = taosHashIterate(pHashObj, pIter);
|
||||
}
|
||||
|
||||
// second: arrange the tables and put the items with same groupId together
|
||||
STableKeyInfo* pStart = taosArrayGet(pTableListInfo->pTableList, 0);
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
STableKeyInfo* pInfo = taosArrayGet(pDup, i);
|
||||
int32_t* pVal = taosHashGet(pHashObj, &pInfo->groupId, sizeof(pInfo->groupId));
|
||||
if (*pVal != i) {
|
||||
pStart[*pVal] = *pInfo;
|
||||
}
|
||||
(*pVal)++; // update to next item's position
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
taosHashCleanup(pHashObj);
|
||||
taosArrayDestroy(pDup);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFreeClear(pTableListInfo->groupOffset);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode,
|
||||
SNodeList* group, bool groupSort, uint8_t* digest, SStorageAPI* pAPI) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -2206,19 +2161,21 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
|||
pTableListInfo->numOfOuputGroups = 1;
|
||||
}
|
||||
} else {
|
||||
code = getColInfoResultForGroupby(pHandle->vnode, group, pTableListInfo, digest, pAPI);
|
||||
bool initRemainGroups = false;
|
||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
|
||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
|
||||
if (pTableScanNode->needCountEmptyTable) {
|
||||
initRemainGroups = true;
|
||||
}
|
||||
}
|
||||
|
||||
code = getColInfoResultForGroupby(pHandle->vnode, group, pTableListInfo, digest, pAPI, initRemainGroups);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (groupSort || pScanNode->groupOrderScan) {
|
||||
code = sortTableGroup(pTableListInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
|
||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
|
||||
if (pTableScanNode->needCountEmptyTable) {
|
||||
// only put together tables with the same groupid
|
||||
arrangeTableGroup(pTableListInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -673,14 +673,7 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i
|
|||
pInfo->tableEndIndex = numOfTables - 1;
|
||||
}
|
||||
} else {
|
||||
int32_t i = pInfo->tableStartIndex + 1;
|
||||
for (; i < numOfTables; ++i) {
|
||||
STableKeyInfo* pCur = tableListGetInfo(pTableListInfo, i);
|
||||
if (pCur->groupId != pStart->groupId) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
pInfo->tableEndIndex = i - 1;
|
||||
pInfo->tableEndIndex = numOfTables - 1;
|
||||
}
|
||||
|
||||
if (!pInfo->needCountEmptyTable) {
|
||||
|
@ -693,6 +686,17 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i
|
|||
*size = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
|
||||
}
|
||||
|
||||
void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) {
|
||||
if (pInfo->countState == TABLE_COUNT_STATE_END) {
|
||||
return;
|
||||
}
|
||||
if (pInfo->base.pTableListInfo->oneTableForEachGroup) {
|
||||
pInfo->countState = TABLE_COUNT_STATE_PROCESSED;
|
||||
} else {
|
||||
taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId));
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBase* pBase, SSDataBlock* pBlock,
|
||||
const STableKeyInfo* tbInfo) {
|
||||
blockDataEmpty(pBlock);
|
||||
|
@ -802,7 +806,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
|
|||
while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||
if (p != NULL) {
|
||||
pTableScanInfo->countState = TABLE_COUNT_STATE_END;
|
||||
markGroupProcessed(pTableScanInfo, p->info.id.groupId);
|
||||
return p;
|
||||
}
|
||||
|
||||
|
@ -831,7 +835,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
|
|||
while (pTableScanInfo->scanTimes < total) {
|
||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||
if (p != NULL) {
|
||||
pTableScanInfo->countState = TABLE_COUNT_STATE_END;
|
||||
markGroupProcessed(pTableScanInfo, p->info.id.groupId);
|
||||
return p;
|
||||
}
|
||||
|
||||
|
@ -849,11 +853,30 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
if (pTableScanInfo->countState < TABLE_COUNT_STATE_END) {
|
||||
// output once for this group
|
||||
STableListInfo* pTableListInfo = pTableScanInfo->base.pTableListInfo;
|
||||
if (pTableListInfo->oneTableForEachGroup) { // group by tbname
|
||||
if (pTableScanInfo->countState < TABLE_COUNT_STATE_PROCESSED) {
|
||||
pTableScanInfo->countState = TABLE_COUNT_STATE_PROCESSED;
|
||||
STableKeyInfo* pStart =
|
||||
(STableKeyInfo*)tableListGetInfo(pTableScanInfo->base.pTableListInfo, pTableScanInfo->tableStartIndex);
|
||||
return getBlockForEmptyTable(pOperator, pStart);
|
||||
}
|
||||
} else { // group by tag
|
||||
int32_t numOfTables = tableListGetSize(pTableListInfo);
|
||||
if (pTableScanInfo->tableEndIndex + 1 >= numOfTables) {
|
||||
// get empty group, mark processed & rm from hash
|
||||
void* pIte = taosHashIterate(pTableListInfo->remainGroups, NULL);
|
||||
if (pIte != NULL) {
|
||||
size_t keySize = 0;
|
||||
uint64_t* pGroupId = taosHashGetKey(pIte, &keySize);
|
||||
STableKeyInfo info = {.uid = *(uint64_t*)pIte, .groupId = *pGroupId};
|
||||
taosHashCancelIterate(pTableListInfo->remainGroups, pIte);
|
||||
markGroupProcessed(pTableScanInfo, *pGroupId);
|
||||
return getBlockForEmptyTable(pOperator, &info);
|
||||
}
|
||||
}
|
||||
}
|
||||
pTableScanInfo->countState = TABLE_COUNT_STATE_END;
|
||||
STableKeyInfo* pStart =
|
||||
(STableKeyInfo*)tableListGetInfo(pTableScanInfo->base.pTableListInfo, pTableScanInfo->tableStartIndex);
|
||||
return getBlockForEmptyTable(pOperator, pStart);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
|
Loading…
Reference in New Issue