Merge pull request #14126 from taosdata/szhou/feature/sort-group-2
feat: prepare for multiple group tsdb reads
This commit is contained in:
commit
cc3543d4e3
|
@ -195,6 +195,7 @@ struct SVnodeCfg {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
TSKEY lastKey;
|
TSKEY lastKey;
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
|
uint64_t groupId;
|
||||||
} STableKeyInfo;
|
} STableKeyInfo;
|
||||||
|
|
||||||
struct SMetaEntry {
|
struct SMetaEntry {
|
||||||
|
|
|
@ -2845,7 +2845,7 @@ int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id};
|
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id, .groupId = 0};
|
||||||
taosArrayPush(list, &info);
|
taosArrayPush(list, &info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -838,14 +838,19 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
|
||||||
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
|
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
|
||||||
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||||
|
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId,
|
||||||
|
SNode* pTagCond);
|
||||||
|
int32_t doCreateMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||||
STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId,
|
STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId,
|
||||||
uint64_t taskId, SNode* pTagCond);
|
uint64_t taskId);
|
||||||
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SArray* dataReaders,
|
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SArray* dataReaders,
|
||||||
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
|
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
|
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
|
||||||
|
|
||||||
|
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -239,7 +239,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(res); i++) {
|
for (int i = 0; i < taosArrayGetSize(res); i++) {
|
||||||
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)};
|
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0};
|
||||||
taosArrayPush(pListInfo->pTableList, &info);
|
taosArrayPush(pListInfo->pTableList, &info);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(res);
|
taosArrayDestroy(res);
|
||||||
|
@ -247,7 +247,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
|
||||||
code = tsdbGetAllTableList(metaHandle, tableUid, pListInfo->pTableList);
|
code = tsdbGetAllTableList(metaHandle, tableUid, pListInfo->pTableList);
|
||||||
}
|
}
|
||||||
} else { // Create one table group.
|
} else { // Create one table group.
|
||||||
STableKeyInfo info = {.lastKey = 0, .uid = tableUid};
|
STableKeyInfo info = {.lastKey = 0, .uid = tableUid, .groupId = 0};
|
||||||
taosArrayPush(pListInfo->pTableList, &info);
|
taosArrayPush(pListInfo->pTableList, &info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3968,14 +3968,16 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = (int32_t)(pStart - (char*)keyBuf);
|
int32_t len = (int32_t)(pStart - (char*)keyBuf);
|
||||||
uint64_t* groupId = taosHashGet(pTableListInfo->map, keyBuf, len);
|
|
||||||
if (groupId) {
|
uint64_t* pGroupId = taosHashGet(pTableListInfo->map, keyBuf, len);
|
||||||
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), groupId, sizeof(uint64_t));
|
|
||||||
} else {
|
if (!pGroupId) {
|
||||||
uint64_t tmpId = calcGroupId(keyBuf, len);
|
uint64_t tmpId = calcGroupId(keyBuf, len);
|
||||||
|
info->groupId = tmpId;
|
||||||
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &tmpId, sizeof(uint64_t));
|
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &tmpId, sizeof(uint64_t));
|
||||||
|
} else {
|
||||||
|
info->groupId = *pGroupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
@ -4023,11 +4025,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
SArray* dataReaders = taosArrayInit(8, POINTER_BYTES);
|
SArray* dataReaders = taosArrayInit(8, POINTER_BYTES);
|
||||||
createMultipleDataReaders(pTableScanNode, pHandle, pTableListInfo, dataReaders, queryId, taskId, pTagCond);
|
createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId, pTagCond);
|
||||||
|
doCreateMultipleDataReaders(pTableScanNode, pHandle, pTableListInfo, dataReaders, queryId, taskId);
|
||||||
|
|
||||||
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
||||||
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
|
|
||||||
generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
|
|
||||||
taosArrayDestroy(groupKeys);
|
|
||||||
SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo);
|
SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo);
|
||||||
STableScanInfo* pScanInfo = pOperator->info;
|
STableScanInfo* pScanInfo = pOperator->info;
|
||||||
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||||
|
@ -4097,7 +4099,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
} else { // Create one table group.
|
} else { // Create one table group.
|
||||||
STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid};
|
STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid, .groupId = 0};
|
||||||
taosArrayPush(pTableListInfo->pTableList, &info);
|
taosArrayPush(pTableListInfo->pTableList, &info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -616,7 +616,7 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid) {
|
||||||
metaGetTableEntryByUid(&mr, uid);
|
metaGetTableEntryByUid(&mr, uid);
|
||||||
if (mr.me.type == TSDB_SUPER_TABLE) {
|
if (mr.me.type == TSDB_SUPER_TABLE) {
|
||||||
int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
|
int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
|
rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
|
||||||
}
|
}
|
||||||
} else if (mr.me.type == TSDB_CHILD_TABLE) {
|
} else if (mr.me.type == TSDB_CHILD_TABLE) {
|
||||||
|
@ -624,12 +624,12 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid) {
|
||||||
metaGetTableEntryByUid(&mr, suid);
|
metaGetTableEntryByUid(&mr, suid);
|
||||||
int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
|
int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
|
rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
|
||||||
}
|
}
|
||||||
} else if (mr.me.type == TSDB_NORMAL_TABLE) {
|
} else if (mr.me.type == TSDB_NORMAL_TABLE) {
|
||||||
int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols;
|
int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols;
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
rowLen += mr.me.ntbEntry.schemaRow.pSchema[i].bytes;
|
rowLen += mr.me.ntbEntry.schemaRow.pSchema[i].bytes;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1878,7 +1878,6 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
|
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
|
||||||
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
|
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
|
||||||
|
|
||||||
|
|
||||||
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
|
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -1963,25 +1962,42 @@ typedef struct STableMergeScanInfo {
|
||||||
|
|
||||||
} STableMergeScanInfo;
|
} STableMergeScanInfo;
|
||||||
|
|
||||||
int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
|
||||||
STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId,
|
const STableKeyInfo* info1 = p1;
|
||||||
uint64_t taskId, SNode* pTagCond) {
|
const STableKeyInfo* info2 = p2;
|
||||||
|
return info1->groupId - info2->groupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||||
|
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId,
|
||||||
|
SNode* pTagCond) {
|
||||||
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond);
|
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
|
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
|
||||||
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
|
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
|
||||||
goto _error;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
|
||||||
|
generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
|
||||||
|
if (groupKeys) {
|
||||||
|
taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(groupKeys);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t doCreateMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||||
|
STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId,
|
||||||
|
uint64_t taskId) {
|
||||||
|
|
||||||
SQueryTableDataCond cond = {0};
|
SQueryTableDataCond cond = {0};
|
||||||
code = initQueryTableDataCond(&cond, pTableScanNode);
|
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
// TODO: free the sublist info and the table list in it
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); ++i) {
|
||||||
STableListInfo* subListInfo = taosMemoryCalloc(1, sizeof(subListInfo));
|
STableListInfo* subListInfo = taosMemoryCalloc(1, sizeof(subListInfo));
|
||||||
subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
|
subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||||
|
@ -1995,7 +2011,7 @@ int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHand
|
||||||
}
|
}
|
||||||
cleanupQueryTableDataCond(&cond);
|
cleanupQueryTableDataCond(&cond);
|
||||||
|
|
||||||
return 0;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue