fix(query): reset the page id.
This commit is contained in:
parent
e5f5d3710d
commit
dba4188f8a
|
@ -3434,6 +3434,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
|
|||
|
||||
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
|
||||
const char* pKey) {
|
||||
int32_t code = 0;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
|
||||
pAggSup->currentPageId = -1;
|
||||
|
@ -3450,18 +3451,18 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
|
|||
getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
|
||||
|
||||
if (!osTempSpaceAvailable()) {
|
||||
terrno = TSDB_CODE_NO_AVAIL_DISK;
|
||||
qError("Init stream agg supporter failed since %s", terrstr(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("Create agg result buf failed since %s", tstrerror(code));
|
||||
code = TSDB_CODE_NO_AVAIL_DISK;
|
||||
qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey);
|
||||
return code;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
|
||||
return code;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void cleanupAggSup(SAggSupporter* pAggSup) {
|
||||
|
|
|
@ -1828,12 +1828,6 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
|
|||
return needed;
|
||||
}
|
||||
|
||||
void increaseTs(SqlFunctionCtx* pCtx) {
|
||||
if (pCtx[0].pExpr->pExpr->_function.pFunctNode->funcType == FUNCTION_TYPE_WSTART) {
|
||||
// pCtx[0].increase = true;
|
||||
}
|
||||
}
|
||||
|
||||
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup) {
|
||||
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
// Todo(liuyao) support partition by column
|
||||
|
@ -1895,7 +1889,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
|
||||
if (isStream) {
|
||||
ASSERT(numOfCols > 0);
|
||||
increaseTs(pSup->pCtx);
|
||||
initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
|
||||
}
|
||||
|
||||
|
@ -3050,6 +3043,7 @@ static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo)
|
|||
tSimpleHashClear(pInfo->aggSup.pResultRowHashTable);
|
||||
clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||
pInfo->aggSup.currentPageId = -1;
|
||||
}
|
||||
|
||||
static void clearSpecialDataBlock(SSDataBlock* pBlock) {
|
||||
|
@ -3420,7 +3414,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
ASSERT(numOfCols > 0);
|
||||
increaseTs(pOperator->exprSupp.pCtx);
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||
|
@ -3451,6 +3444,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
// semi interval operator does not catch result
|
||||
pInfo->isFinal = false;
|
||||
pOperator->name = "StreamSemiIntervalOperator";
|
||||
ASSERT(pInfo->aggSup.currentPageId == -1);
|
||||
}
|
||||
|
||||
if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
|
||||
|
@ -3563,7 +3557,6 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo*
|
|||
}
|
||||
|
||||
ASSERT(numOfCols > 0);
|
||||
increaseTs(pSup->pCtx);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue