diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 19daa7e96a..4ffa80d468 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3434,6 +3434,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey) { + int32_t code = 0; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pAggSup->currentPageId = -1; @@ -3450,18 +3451,18 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz); if (!osTempSpaceAvailable()) { - terrno = TSDB_CODE_NO_AVAIL_DISK; - qError("Init stream agg supporter failed since %s", terrstr(terrno)); - return terrno; - } - - int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir); - if (code != TSDB_CODE_SUCCESS) { - qError("Create agg result buf failed since %s", tstrerror(code)); + code = TSDB_CODE_NO_AVAIL_DISK; + qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey); return code; } - return TSDB_CODE_SUCCESS; + code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir); + if (code != TSDB_CODE_SUCCESS) { + qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey); + return code; + } + + return code; } void cleanupAggSup(SAggSupporter* pAggSup) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9ea6b4c42f..c28bc7e9e8 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1828,12 +1828,6 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt return needed; } -void increaseTs(SqlFunctionCtx* pCtx) { - if (pCtx[0].pExpr->pExpr->_function.pFunctNode->funcType == FUNCTION_TYPE_WSTART) { -// pCtx[0].increase = true; - } -} - void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup) { if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { // Todo(liuyao) support partition by column @@ -1895,7 +1889,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* if (isStream) { ASSERT(numOfCols > 0); - increaseTs(pSup->pCtx); initStreamFunciton(pSup->pCtx, pSup->numOfExprs); } @@ -3050,6 +3043,7 @@ static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) tSimpleHashClear(pInfo->aggSup.pResultRowHashTable); clearDiskbasedBuf(pInfo->aggSup.pResultBuf); initResultRowInfo(&pInfo->binfo.resultRowInfo); + pInfo->aggSup.currentPageId = -1; } static void clearSpecialDataBlock(SSDataBlock* pBlock) { @@ -3420,7 +3414,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, initBasicInfo(&pInfo->binfo, pResBlock); ASSERT(numOfCols > 0); - increaseTs(pOperator->exprSupp.pCtx); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -3451,6 +3444,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, // semi interval operator does not catch result pInfo->isFinal = false; pOperator->name = "StreamSemiIntervalOperator"; + ASSERT(pInfo->aggSup.currentPageId == -1); } if (!IS_FINAL_OP(pInfo) || numOfChild == 0) { @@ -3563,7 +3557,6 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* } ASSERT(numOfCols > 0); - increaseTs(pSup->pCtx); return TSDB_CODE_SUCCESS; }