feat: add table merge scan operator

This commit is contained in:
shenglian zhou 2022-06-14 19:44:48 +08:00
parent 1b1d1d1c4a
commit 4fa20a1e76
1 changed files with 25 additions and 21 deletions

View File

@ -1791,7 +1791,7 @@ typedef struct STableMergeScanInfo {
uint64_t groupId;
STupleHandle* prefetchedTuple;
SArray* sortSourceParams;
SArray* sortSourceParams;
SFileBlockLoadRecorder readRecorder;
int64_t numOfRows;
@ -1842,12 +1842,17 @@ int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHand
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
// TODO: free the sublist info and the table list in it
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); ++i) {
STableListInfo* subListInfo = taosMemoryCalloc(1, sizeof(subListInfo));
subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i));
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, subListInfo, queryId, taskId);
taosArrayPush(arrayReader, &pReader);
taosArrayDestroy(subListInfo->pTableList);
taosMemoryFree(subListInfo);
}
clearupQueryTableDataCond(&cond);
@ -2042,8 +2047,6 @@ int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}
pInfo->startTs = taosGetTimestampUs();
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle =
@ -2074,7 +2077,8 @@ int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SOperatorInfo* pOperator) {
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -2137,8 +2141,8 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
longjmp(pTaskInfo->env, code);
}
SSDataBlock* pBlock = getSortedTableMergeScanBlockData(
pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator);
SSDataBlock* pBlock =
getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator);
if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
@ -2154,7 +2158,7 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
clearupQueryTableDataCond(&pTableScanInfo->cond);
for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) {
tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, i);
tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, i);
tsdbCleanupReadHandle(reader);
}
taosArrayDestroy(pTableScanInfo->dataReaders);
@ -2170,22 +2174,22 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pTableScanInfo->pSortInfo);
}
typedef struct STableMergeScanExecInfo {
SFileBlockLoadRecorder blockRecorder;
SSortExecInfo sortExecInfo;
} STableMergeScanExecInfo;
int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
ASSERT(pOptr != NULL);
//TODO: merge these two info into one struct
SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
STableScanInfo* pTableScanInfo = pOptr->info;
*pRecorder = pTableScanInfo->readRecorder;
*pOptrExplain = pRecorder;
*len = sizeof(SFileBlockLoadRecorder);
// TODO: merge these two info into one struct
STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
STableMergeScanInfo* pInfo = pOptr->info;
execInfo->blockRecorder = pInfo->readRecorder;
execInfo->sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
*pOptrExplain = execInfo;
*len = sizeof(STableMergeScanExecInfo);
STableMergeScanInfo* pOperatorInfo = (STableMergeScanInfo*)pOptr->info;
*pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle);
*pOptrExplain = pInfo;
*len = sizeof(SSortExecInfo);
return TSDB_CODE_SUCCESS;
}