fix(query): remove invalid time window close ops.
This commit is contained in:
parent
97d0e558bc
commit
7bc773b8cd
|
@ -549,7 +549,8 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo {
|
||||||
SIntervalAggOperatorInfo *intervalAggOperatorInfo;
|
SIntervalAggOperatorInfo *intervalAggOperatorInfo;
|
||||||
|
|
||||||
bool hasGroupId;
|
bool hasGroupId;
|
||||||
uint64_t groupId;
|
uint64_t groupId; // current groupId
|
||||||
|
int64_t curTs; // current ts
|
||||||
SSDataBlock* prefetchedBlock;
|
SSDataBlock* prefetchedBlock;
|
||||||
bool inputBlocksFinished;
|
bool inputBlocksFinished;
|
||||||
|
|
||||||
|
|
|
@ -1445,6 +1445,7 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo extract method with copytoSSDataBlock
|
||||||
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
||||||
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
|
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
|
||||||
const int32_t* rowCellOffset, SSDataBlock* pBlock,
|
const int32_t* rowCellOffset, SSDataBlock* pBlock,
|
||||||
|
|
|
@ -4589,11 +4589,10 @@ void destroyMergeAlignedIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId,
|
static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId,
|
||||||
SSDataBlock* pResultBlock, TSKEY wstartTs) {
|
SSDataBlock* pResultBlock, TSKEY wstartTs) {
|
||||||
SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
|
SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
|
||||||
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
|
|
||||||
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
|
||||||
|
|
||||||
SExprSupp* pSup = &pOperatorInfo->exprSupp;
|
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
|
||||||
bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
|
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||||
|
SExprSupp* pSup = &pOperatorInfo->exprSupp;
|
||||||
|
|
||||||
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId);
|
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId);
|
||||||
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
|
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
|
||||||
|
@ -4603,8 +4602,9 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui
|
||||||
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs,
|
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs,
|
||||||
pSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
|
pSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
|
||||||
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||||
|
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
|
||||||
|
|
||||||
return 0;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
|
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
|
||||||
|
@ -4619,11 +4619,21 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
|
||||||
int32_t numOfOutput = pSup->numOfExprs;
|
int32_t numOfOutput = pSup->numOfExprs;
|
||||||
int64_t* tsCols = extractTsCol(pBlock, iaInfo);
|
int64_t* tsCols = extractTsCol(pBlock, iaInfo);
|
||||||
uint64_t tableGroupId = pBlock->info.groupId;
|
uint64_t tableGroupId = pBlock->info.groupId;
|
||||||
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
|
TSKEY currTs = getStartTsKey(&pBlock->info.window, tsCols);
|
||||||
SResultRow* pResult = NULL;
|
SResultRow* pResult = NULL;
|
||||||
|
|
||||||
STimeWindow win;
|
// there is an result exists
|
||||||
win.skey = blockStartTs;
|
if (miaInfo->curTs != INT64_MIN) {
|
||||||
|
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
|
||||||
|
if (currTs != miaInfo->curTs) {
|
||||||
|
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs);
|
||||||
|
}
|
||||||
|
|
||||||
|
miaInfo->curTs = INT64_MIN;
|
||||||
|
}
|
||||||
|
|
||||||
|
STimeWindow win = {0};
|
||||||
|
win.skey = currTs;
|
||||||
win.ekey =
|
win.ekey =
|
||||||
taosTimeAdd(win.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1;
|
taosTimeAdd(win.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1;
|
||||||
|
|
||||||
|
@ -4634,41 +4644,44 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
|
||||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
TSKEY currTs = blockStartTs;
|
int32_t currPos = startPos;
|
||||||
TSKEY currPos = startPos;
|
|
||||||
STimeWindow currWin = win;
|
STimeWindow currWin = win;
|
||||||
while (1) {
|
while (++currPos < pBlock->info.rows) {
|
||||||
++currPos;
|
|
||||||
if (currPos >= pBlock->info.rows) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (tsCols[currPos] == currTs) {
|
if (tsCols[currPos] == currTs) {
|
||||||
continue;
|
continue;
|
||||||
} else {
|
}
|
||||||
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
|
|
||||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
|
|
||||||
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
|
|
||||||
|
|
||||||
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
|
miaInfo->curTs = currWin.skey;
|
||||||
|
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
|
||||||
|
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
|
||||||
|
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
|
||||||
|
|
||||||
currTs = tsCols[currPos];
|
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
|
||||||
currWin.skey = currTs;
|
|
||||||
currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit,
|
currTs = tsCols[currPos];
|
||||||
iaInfo->interval.precision) -
|
currWin.skey = currTs;
|
||||||
1;
|
currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit,
|
||||||
startPos = currPos;
|
iaInfo->interval.precision) - 1;
|
||||||
ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
|
|
||||||
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
|
startPos = currPos;
|
||||||
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
|
ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
|
||||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
|
||||||
}
|
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
|
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
|
||||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
|
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
|
||||||
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
|
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
|
||||||
|
|
||||||
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
|
if (currPos >= pBlock->info.rows) {
|
||||||
|
// we need to see next block if exists
|
||||||
|
} else {
|
||||||
|
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
|
||||||
|
miaInfo->curTs = INT64_MIN;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
@ -4682,12 +4695,14 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
SSDataBlock* pRes = iaInfo->binfo.pRes;
|
SSDataBlock* pRes = iaInfo->binfo.pRes;
|
||||||
|
|
||||||
blockDataCleanup(pRes);
|
blockDataCleanup(pRes);
|
||||||
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
if (!miaInfo->inputBlocksFinished) {
|
if (!miaInfo->inputBlocksFinished) {
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
int32_t scanFlag = MAIN_SCAN;
|
int32_t scanFlag = MAIN_SCAN;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = NULL;
|
SSDataBlock* pBlock = NULL;
|
||||||
if (miaInfo->prefetchedBlock == NULL) {
|
if (miaInfo->prefetchedBlock == NULL) {
|
||||||
|
@ -4699,6 +4714,14 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
|
// close last unfinalized time window
|
||||||
|
if (miaInfo->curTs != INT64_MIN) {
|
||||||
|
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
|
||||||
|
outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs);
|
||||||
|
miaInfo->curTs = INT64_MIN;
|
||||||
|
}
|
||||||
|
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
miaInfo->inputBlocksFinished = true;
|
miaInfo->inputBlocksFinished = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -4707,7 +4730,11 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
miaInfo->hasGroupId = true;
|
miaInfo->hasGroupId = true;
|
||||||
miaInfo->groupId = pBlock->info.groupId;
|
miaInfo->groupId = pBlock->info.groupId;
|
||||||
} else if (miaInfo->groupId != pBlock->info.groupId) {
|
} else if (miaInfo->groupId != pBlock->info.groupId) {
|
||||||
|
// if there are unclosed time window, close it firstly.
|
||||||
|
ASSERT(miaInfo->curTs != INT64_MIN);
|
||||||
|
outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs);
|
||||||
miaInfo->prefetchedBlock = pBlock;
|
miaInfo->prefetchedBlock = pBlock;
|
||||||
|
miaInfo->curTs = INT64_MIN;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4722,11 +4749,8 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
pRes->info.groupId = miaInfo->groupId;
|
pRes->info.groupId = miaInfo->groupId;
|
||||||
}
|
}
|
||||||
miaInfo->hasGroupId = false;
|
|
||||||
|
|
||||||
if (miaInfo->inputBlocksFinished) {
|
miaInfo->hasGroupId = false;
|
||||||
doSetOperatorCompleted(pOperator);
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t rows = pRes->info.rows;
|
size_t rows = pRes->info.rows;
|
||||||
pOperator->resultInfo.totalRows += rows;
|
pOperator->resultInfo.totalRows += rows;
|
||||||
|
@ -4752,6 +4776,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
|
||||||
miaInfo->pCondition = pCondition;
|
miaInfo->pCondition = pCondition;
|
||||||
|
miaInfo->curTs = INT64_MIN;
|
||||||
|
|
||||||
iaInfo->win = pTaskInfo->window;
|
iaInfo->win = pTaskInfo->window;
|
||||||
iaInfo->inputOrder = TSDB_ORDER_ASC;
|
iaInfo->inputOrder = TSDB_ORDER_ASC;
|
||||||
iaInfo->interval = *pInterval;
|
iaInfo->interval = *pInterval;
|
||||||
|
|
Loading…
Reference in New Issue