refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2022-08-02 15:43:45 +08:00
parent cbed4713af
commit 258d50306f
4 changed files with 63 additions and 72 deletions

View File

@ -554,8 +554,6 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo {
uint64_t groupId; // current groupId uint64_t groupId; // current groupId
int64_t curTs; // current ts int64_t curTs; // current ts
SSDataBlock* prefetchedBlock; SSDataBlock* prefetchedBlock;
bool inputBlocksFinished;
SNode* pCondition; SNode* pCondition;
} SMergeAlignedIntervalAggOperatorInfo; } SMergeAlignedIntervalAggOperatorInfo;
@ -906,7 +904,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
SExecTaskInfo* pTaskInfo); bool mergeResultBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,

View File

@ -50,7 +50,7 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead
STableListInfo* pTableList = &pTaskInfo->tableqinfoList; STableListInfo* pTableList = &pTaskInfo->tableqinfoList;
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));

View File

@ -3481,10 +3481,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error; goto _error;
} }
int32_t numOfRows = 1024;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, numOfRows); initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
@ -4147,7 +4146,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo); pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pIntervalPhyNode->window.mergeDataBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
int32_t children = 0; int32_t children = 0;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);

View File

@ -4622,20 +4622,25 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
int32_t numOfOutput = pSup->numOfExprs; int32_t numOfOutput = pSup->numOfExprs;
int64_t* tsCols = extractTsCol(pBlock, iaInfo); int64_t* tsCols = extractTsCol(pBlock, iaInfo);
uint64_t tableGroupId = pBlock->info.groupId; uint64_t tableGroupId = pBlock->info.groupId;
TSKEY currTs = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
// there is an result exists // there is an result exists
if (miaInfo->curTs != INT64_MIN) { if (miaInfo->curTs != INT64_MIN) {
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
if (currTs != miaInfo->curTs) {
if (ts != miaInfo->curTs) {
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs); outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs);
miaInfo->curTs = INT64_MIN; miaInfo->curTs = ts;
} }
} else {
miaInfo->curTs = ts;
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
} }
STimeWindow win = {0}; STimeWindow win = {0};
win.skey = currTs; win.skey = miaInfo->curTs;
win.ekey = win.ekey =
taosTimeAdd(win.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1; taosTimeAdd(win.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1;
@ -4646,12 +4651,11 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
miaInfo->curTs = win.skey;
int32_t currPos = startPos; int32_t currPos = startPos;
STimeWindow currWin = win; STimeWindow currWin = win;
while (++currPos < pBlock->info.rows) { while (++currPos < pBlock->info.rows) {
if (tsCols[currPos] == currTs) { if (tsCols[currPos] == miaInfo->curTs) {
continue; continue;
} }
@ -4659,11 +4663,10 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs);
miaInfo->curTs = INT64_MIN; miaInfo->curTs = tsCols[currPos];
currTs = tsCols[currPos]; currWin.skey = miaInfo->curTs;
currWin.skey = currTs;
currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit,
iaInfo->interval.precision) - 1; iaInfo->interval.precision) - 1;
@ -4680,14 +4683,6 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
if (currPos >= pBlock->info.rows) {
// we need to see next block if exists
} else {
ASSERT(0);
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
miaInfo->curTs = INT64_MIN;
}
} }
static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
@ -4703,9 +4698,7 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
SSDataBlock* pRes = iaInfo->binfo.pRes; SSDataBlock* pRes = iaInfo->binfo.pRes;
blockDataCleanup(pRes); blockDataCleanup(pRes);
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
if (!miaInfo->inputBlocksFinished) {
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
int32_t scanFlag = MAIN_SCAN; int32_t scanFlag = MAIN_SCAN;
@ -4715,8 +4708,9 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
pBlock = downstream->fpSet.getNextFn(downstream); pBlock = downstream->fpSet.getNextFn(downstream);
} else { } else {
pBlock = miaInfo->prefetchedBlock; pBlock = miaInfo->prefetchedBlock;
miaInfo->groupId = pBlock->info.groupId;
miaInfo->prefetchedBlock = NULL; miaInfo->prefetchedBlock = NULL;
miaInfo->groupId = pBlock->info.groupId;
} }
if (pBlock == NULL) { if (pBlock == NULL) {
@ -4728,7 +4722,6 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
} }
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
miaInfo->inputBlocksFinished = true;
break; break;
} }
@ -4747,6 +4740,7 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag); getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag);
setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->inputOrder, scanFlag, true); setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->inputOrder, scanFlag, true);
doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
doFilter(miaInfo->pCondition, pRes, NULL); doFilter(miaInfo->pCondition, pRes, NULL);
if (pRes->info.rows >= pOperator->resultInfo.capacity) { if (pRes->info.rows >= pOperator->resultInfo.capacity) {
break; break;
@ -4754,8 +4748,6 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
} }
pRes->info.groupId = miaInfo->groupId; pRes->info.groupId = miaInfo->groupId;
}
miaInfo->hasGroupId = false; miaInfo->hasGroupId = false;
size_t rows = pRes->info.rows; size_t rows = pRes->info.rows;
@ -4809,6 +4801,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
} }
initResultRowInfo(&iaInfo->binfo.resultRowInfo); initResultRowInfo(&iaInfo->binfo.resultRowInfo);
blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
pOperator->name = "TimeMergeAlignedIntervalAggOperator"; pOperator->name = "TimeMergeAlignedIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL;
@ -5076,7 +5069,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
SExecTaskInfo* pTaskInfo) { bool mergeBlock, SExecTaskInfo* pTaskInfo) {
SMergeIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo)); SMergeIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (miaInfo == NULL || pOperator == NULL) { if (miaInfo == NULL || pOperator == NULL) {
@ -5090,6 +5083,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
iaInfo->inputOrder = TSDB_ORDER_ASC; iaInfo->inputOrder = TSDB_ORDER_ASC;
iaInfo->interval = *pInterval; iaInfo->interval = *pInterval;
iaInfo->execModel = pTaskInfo->execModel; iaInfo->execModel = pTaskInfo->execModel;
iaInfo->binfo.mergeResultBlock = mergeBlock;
iaInfo->primaryTsIndex = primaryTsSlotId; iaInfo->primaryTsIndex = primaryTsSlotId;