fix(query): always build hash map from uid to keyinfo
This commit is contained in:
parent
4cd14c4163
commit
e4de732285
|
@ -1077,7 +1077,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
|
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
|
||||||
|
|
||||||
bool groupbyTbname(SNodeList* pGroupList);
|
bool groupbyTbname(SNodeList* pGroupList);
|
||||||
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort);
|
int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort);
|
||||||
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
|
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
|
||||||
SGroupResInfo* pGroupResInfo);
|
SGroupResInfo* pGroupResInfo);
|
||||||
|
|
|
@ -741,7 +741,8 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
|
||||||
|
|
||||||
int32_t len = (int32_t)(pStart - (char*)keyBuf);
|
int32_t len = (int32_t)(pStart - (char*)keyBuf);
|
||||||
info->groupId = calcGroupId(keyBuf, len);
|
info->groupId = calcGroupId(keyBuf, len);
|
||||||
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
|
|
||||||
|
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &i, sizeof(int32_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
// int64_t st2 = taosGetTimestampUs();
|
// int64_t st2 = taosGetTimestampUs();
|
||||||
|
@ -1665,29 +1666,29 @@ uint64_t getTotalTables(const STableListInfo* pTableList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
|
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
|
||||||
if (pTableList->oneTableForEachGroup) {
|
ASSERT(pTableList->map != NULL);
|
||||||
return tableUid;
|
int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
|
||||||
}
|
ASSERT(slot != NULL);
|
||||||
|
|
||||||
uint64_t* groupId = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
|
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
|
||||||
if (groupId != NULL) {
|
ASSERT(pKeyInfo->uid == tableUid);
|
||||||
return *groupId;
|
|
||||||
} else {
|
return pKeyInfo->groupId;
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
|
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
|
||||||
STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
|
if (pTableList->map == NULL) {
|
||||||
taosArrayPush(pTableList->pTableList, &keyInfo);
|
ASSERT(taosArrayGetSize(pTableList->pTableList) == 0);
|
||||||
if (!pTableList->oneTableForEachGroup) {
|
pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
if (pTableList->map == NULL) {
|
|
||||||
pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosHashPut(pTableList->map, &uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
|
||||||
|
taosArrayPush(pTableList->pTableList, &keyInfo);
|
||||||
|
|
||||||
|
int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
|
||||||
|
taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
|
||||||
|
|
||||||
|
qDebug("uid:%"PRIu64", groupId:%"PRIu64" added into table list, slot:%d, %d", uid, gid, slot, slot + 1);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1724,6 +1725,7 @@ int32_t getNumOfOutputGroups(const STableListInfo* pTableList) {
|
||||||
return pTableList->numOfOuputGroups;
|
return pTableList->numOfOuputGroups;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo remove it
|
||||||
bool oneTableForEachGroup(const STableListInfo* pTableList) {
|
bool oneTableForEachGroup(const STableListInfo* pTableList) {
|
||||||
return pTableList->oneTableForEachGroup;
|
return pTableList->oneTableForEachGroup;
|
||||||
}
|
}
|
||||||
|
|
|
@ -360,6 +360,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
||||||
|
|
||||||
if (!exists) {
|
if (!exists) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
addTableIntoTableList(pTableListInfo, keyInfo.uid, keyInfo.groupId);
|
addTableIntoTableList(pTableListInfo, keyInfo.uid, keyInfo.groupId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3489,11 +3489,8 @@ bool groupbyTbname(SNodeList* pGroupList) {
|
||||||
return bytbname;
|
return bytbname;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) {
|
int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (group == NULL) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
if (pTableListInfo->map == NULL) {
|
if (pTableListInfo->map == NULL) {
|
||||||
|
@ -3501,19 +3498,22 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool assignUid = groupbyTbname(group);
|
bool groupByTbname = groupbyTbname(group);
|
||||||
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
||||||
|
if (group == NULL || groupByTbname) {
|
||||||
if (assignUid) { // in case of group/partition by tbname, the group id is equalled to the uid of table
|
|
||||||
for (int32_t i = 0; i < numOfTables; i++) {
|
for (int32_t i = 0; i < numOfTables; i++) {
|
||||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
info->groupId = info->uid;
|
info->groupId = groupByTbname? info->uid:0;
|
||||||
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
|
|
||||||
|
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &i, sizeof(int32_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableListInfo->oneTableForEachGroup = true;
|
pTableListInfo->oneTableForEachGroup = groupByTbname;
|
||||||
if (groupSort) {
|
|
||||||
|
if (groupSort && groupByTbname) {
|
||||||
pTableListInfo->numOfOuputGroups = numOfTables;
|
pTableListInfo->numOfOuputGroups = numOfTables;
|
||||||
|
} else {
|
||||||
|
pTableListInfo->numOfOuputGroups = 1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
|
code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
|
||||||
|
|
|
@ -1137,12 +1137,13 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
|
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
|
||||||
SHashObj* map = pInfo->pTableScanOp->pTaskInfo->tableqinfoList.map;
|
return getTableGroupId(&pInfo->pTableScanOp->pTaskInfo->tableqinfoList, uid);
|
||||||
uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t));
|
// SHashObj* map = pInfo->pTableScanOp->pTaskInfo->tableqinfoList.map;
|
||||||
if (groupId) {
|
// uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t));
|
||||||
return *groupId;
|
// if (groupId) {
|
||||||
}
|
// return *groupId;
|
||||||
return 0;
|
// }
|
||||||
|
// return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
|
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
|
||||||
|
@ -1561,12 +1562,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
pInfo->pRes->info.type = STREAM_NORMAL;
|
pInfo->pRes->info.type = STREAM_NORMAL;
|
||||||
pInfo->pRes->info.version = pBlock->info.version;
|
pInfo->pRes->info.version = pBlock->info.version;
|
||||||
|
|
||||||
uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
pInfo->pRes->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid);
|
||||||
if (groupIdPre) {
|
|
||||||
pInfo->pRes->info.groupId = *groupIdPre;
|
|
||||||
} else {
|
|
||||||
pInfo->pRes->info.groupId = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo extract method
|
// todo extract method
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
||||||
|
@ -4236,7 +4232,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableListInfo->numOfOuputGroups = 1;
|
pTableListInfo->numOfOuputGroups = 1;
|
||||||
code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags, groupSort);
|
code = setGroupIdMapForAllTables(pTableListInfo, pHandle, pGroupTags, groupSort);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -4566,10 +4562,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
pBlock->info.groupId = getTableGroupId(&pOperator->pTaskInfo->tableqinfoList, pBlock->info.uid);
|
||||||
if (groupId) {
|
|
||||||
pBlock->info.groupId = *groupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows;
|
||||||
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
@ -4626,11 +4619,7 @@ static SSDataBlock* getTableDataBlock2(void* param) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
pBlock->info.groupId = getTableGroupId(&pOperator->pTaskInfo->tableqinfoList, pBlock->info.uid);
|
||||||
if (groupId) {
|
|
||||||
pBlock->info.groupId = *groupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
|
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
|
||||||
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
|
||||||
|
@ -4684,11 +4673,7 @@ static SSDataBlock* getTableDataBlock(void* param) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
pBlock->info.groupId = getTableGroupId(&pOperator->pTaskInfo->tableqinfoList, pBlock->info.uid);
|
||||||
if (groupId) {
|
|
||||||
pBlock->info.groupId = *groupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
|
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
|
||||||
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue