refactor: start/stop duration from start/stop table merge scan

This commit is contained in:
shenglian zhou 2023-12-01 16:51:02 +08:00
parent bff88e0639
commit f2b38f5a4a
1 changed files with 50 additions and 40 deletions

View File

@ -3327,9 +3327,56 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond*
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
uInfo("tableMergeScanTsdbNotifyCb, %d, %d", type, info->duration.fileSetId);
return;
}
int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit);
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
STableMergeScanSortSourceParam *param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam));
param->pOperator = pOperator;
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = param;
ps->onlyRef = false;
tsortAddSource(pInfo->pSortHandle, ps);
if (numOfTable == 1) {
tsortSetSingleTableMerge(pInfo->pSortHandle);
} else {
code = tsortOpen(pInfo->pSortHandle);
}
return code;
}
void stopDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
pInfo->sortExecInfo.loops += sortExecInfo.loops;
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
tsortDestroySortHandle(pInfo->pSortHandle);
pInfo->pSortHandle = NULL;
}
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -3353,43 +3400,14 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
tSimpleHashClear(pInfo->mTableNumRows);
size_t szRow = blockDataGetRowSize(pInfo->pResBlock);
// if (pInfo->mergeLimit != -1) {
// pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1,
// NULL, pTaskInfo->id.str, pInfo->mergeLimit, szRow+8, tsPQSortMemThreshold * 1024* 1024);
// } else
{
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit);
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
}
tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
// one table has one data block
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
STableMergeScanSortSourceParam *param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam));
param->pOperator = pOperator;
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock,
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables);
pAPI->tsdReader.tsdSetDurationOrder(pInfo->base.dataReader);
pAPI->tsdReader.tsdSetSetNotifyCb(pInfo->base.dataReader, tableMergeScanTsdbNotifyCb, pInfo);
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = param;
ps->onlyRef = false;
tsortAddSource(pInfo->pSortHandle, ps);
int32_t code = TSDB_CODE_SUCCESS;
if (numOfTable == 1) {
tsortSetSingleTableMerge(pInfo->pSortHandle);
} else {
code = tsortOpen(pInfo->pSortHandle);
}
int32_t code = startDurationForGroupTableMergeScan(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, terrno);
@ -3403,21 +3421,13 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
pInfo->sortExecInfo.loops += sortExecInfo.loops;
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
stopDurationForGroupTableMergeScan(pOperator);
if (pInfo->base.dataReader != NULL) {
pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader);
pInfo->base.dataReader = NULL;
}
tsortDestroySortHandle(pInfo->pSortHandle);
pInfo->pSortHandle = NULL;
resetLimitInfoForNextGroup(&pInfo->limitInfo);
taosHashCleanup(pInfo->mSkipTables);
pInfo->mSkipTables = NULL;