fix(query): reset the output buffer when results have been produced.
This commit is contained in:
parent
7f03ffcd75
commit
0bb1dc9d1f
|
@ -88,6 +88,7 @@ struct SqlFunctionCtx;
|
||||||
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
void initResultRowInfo(SResultRowInfo* pResultRowInfo);
|
void initResultRowInfo(SResultRowInfo* pResultRowInfo);
|
||||||
void closeResultRow(SResultRow* pResultRow);
|
void closeResultRow(SResultRow* pResultRow);
|
||||||
|
void resetResultRow(SResultRow* pResultRow, size_t entrySize);
|
||||||
|
|
||||||
struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset);
|
struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset);
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,17 @@ void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
|
||||||
|
|
||||||
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
|
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
|
||||||
|
|
||||||
|
void resetResultRow(SResultRow* pResultRow, size_t entrySize) {
|
||||||
|
pResultRow->numOfRows = 0;
|
||||||
|
pResultRow->closed = false;
|
||||||
|
pResultRow->endInterp = false;
|
||||||
|
pResultRow->startInterp = false;
|
||||||
|
|
||||||
|
if (entrySize > 0) {
|
||||||
|
memset(pResultRow->pEntryInfo, 0, entrySize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO refactor: use macro
|
// TODO refactor: use macro
|
||||||
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
|
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
|
||||||
assert(index >= 0 && offset != NULL);
|
assert(index >= 0 && offset != NULL);
|
||||||
|
|
|
@ -1293,8 +1293,9 @@ static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SR
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
// todo refactor. SResultRow has direct pointer in miainfo
|
||||||
SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
|
||||||
|
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
||||||
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
|
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
|
||||||
SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);
|
SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);
|
||||||
|
|
||||||
|
|
|
@ -4886,14 +4886,8 @@ static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWind
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// set time window for current result ,todo extract method
|
// set time window for current result
|
||||||
(*pResult)->win = (*win);
|
(*pResult)->win = (*win);
|
||||||
(*pResult)->numOfRows = 0;
|
|
||||||
(*pResult)->closed = false;
|
|
||||||
(*pResult)->endInterp = false;
|
|
||||||
(*pResult)->startInterp = false;
|
|
||||||
memset((*pResult)->pEntryInfo, 0, pAggSup->resultRowSize - sizeof(SResultRow));
|
|
||||||
|
|
||||||
setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
|
setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -4916,6 +4910,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
|
||||||
if (miaInfo->curTs != INT64_MIN) {
|
if (miaInfo->curTs != INT64_MIN) {
|
||||||
if (ts != miaInfo->curTs) {
|
if (ts != miaInfo->curTs) {
|
||||||
finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
|
finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
|
||||||
|
resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
|
||||||
miaInfo->curTs = ts;
|
miaInfo->curTs = ts;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -4944,6 +4939,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
|
||||||
pBlock->info.rows, pSup->numOfExprs);
|
pBlock->info.rows, pSup->numOfExprs);
|
||||||
|
|
||||||
finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
|
finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
|
||||||
|
resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
|
||||||
miaInfo->curTs = tsCols[currPos];
|
miaInfo->curTs = tsCols[currPos];
|
||||||
|
|
||||||
currWin.skey = miaInfo->curTs;
|
currWin.skey = miaInfo->curTs;
|
||||||
|
|
Loading…
Reference in New Issue