fix: add new duration processing to table merge scan

This commit is contained in:
shenglian zhou 2023-12-04 10:49:40 +08:00
parent f2b38f5a4a
commit 3f8b2e826f
2 changed files with 34 additions and 13 deletions

View File

@ -297,6 +297,8 @@ typedef struct STableMergeScanInfo {
SHashObj* mSkipTables; SHashObj* mSkipTables;
int64_t mergeLimit; int64_t mergeLimit;
SSortExecInfo sortExecInfo; SSortExecInfo sortExecInfo;
bool bNewDuration;
SSDataBlock* pNewDurationBlock;
} STableMergeScanInfo; } STableMergeScanInfo;
typedef struct STagScanFilterContext { typedef struct STagScanFilterContext {

View File

@ -3242,6 +3242,12 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
STsdbReader* reader = pInfo->base.dataReader; STsdbReader* reader = pInfo->base.dataReader;
while (true) { while (true) {
if (pInfo->pNewDurationBlock) {
SSDataBlock* pNewDurationBlock = pInfo->pNewDurationBlock;
pInfo->pNewDurationBlock = NULL;
return pNewDurationBlock;
}
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
if (code != 0) { if (code != 0) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
@ -3267,7 +3273,6 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
uint32_t status = 0; uint32_t status = 0;
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo)); qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
@ -3290,6 +3295,11 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
if (pInfo->bNewDuration) {
pInfo->pNewDurationBlock = pBlock;
return NULL;
}
return pBlock; return pBlock;
} }
@ -3327,7 +3337,8 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond*
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
uInfo("tableMergeScanTsdbNotifyCb, %d, %d", type, info->duration.fileSetId); uInfo("tableMergeScanTsdbNotifyCb, %d, %d", type, info->duration.fileSetId);
STableMergeScanInfo* pTmsInfo = param;
pTmsInfo->bNewDuration = true;
return; return;
} }
@ -3337,6 +3348,8 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
pInfo->bNewDuration = false;
pInfo->sortBufSize = 2048 * pInfo->bufPageSize; pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
@ -3354,7 +3367,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
ps->param = param; ps->param = param;
ps->onlyRef = false; ps->onlyRef = false;
tsortAddSource(pInfo->pSortHandle, ps); tsortAddSource(pInfo->pSortHandle, ps);
if (numOfTable == 1) { if (numOfTable == 1) {
tsortSetSingleTableMerge(pInfo->pSortHandle); tsortSetSingleTableMerge(pInfo->pSortHandle);
} else { } else {
@ -3510,17 +3523,23 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
return pBlock; return pBlock;
} else { } else {
// Data of this group are all dumped, let's try the next group if (pInfo->bNewDuration) {
stopGroupTableMergeScan(pOperator); stopDurationForGroupTableMergeScan(pOperator);
if (pInfo->tableEndIndex >= tableListSize - 1) { startDurationForGroupTableMergeScan(pOperator);
setOperatorCompleted(pOperator);
break;
}
pInfo->tableStartIndex = pInfo->tableEndIndex + 1; } else {
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId; // Data of this group are all dumped, let's try the next group
startGroupTableMergeScan(pOperator); stopGroupTableMergeScan(pOperator);
resetLimitInfoForNextGroup(&pInfo->limitInfo); if (pInfo->tableEndIndex >= tableListSize - 1) {
setOperatorCompleted(pOperator);
break;
}
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
startGroupTableMergeScan(pOperator);
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
} }
} }