diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 646477855b..99a42d6ad6 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -657,7 +657,7 @@ SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfDownstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SInterval* pInterval, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6db16193f5..88542eabfd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -211,6 +211,7 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); +static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput); static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput); static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput); @@ -5224,9 +5225,8 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { return TSDB_CODE_SUCCESS; } -// TODO handle the error SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { - SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); + SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5235,11 +5235,9 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock size_t numOfSources = LIST_LENGTH(pSources); pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode)); - if (pInfo->pSources == NULL) { - tfree(pInfo); - tfree(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; + pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); + if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) { + goto _error; } for(int32_t i = 0; i < numOfSources; ++i) { @@ -5247,16 +5245,6 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock taosArrayPush(pInfo->pSources, pNode); } - pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); - if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) { - tfree(pInfo); - tfree(pOperator); - taosArrayDestroy(pInfo->pSources); - taosArrayDestroy(pInfo->pSourceDataInfo); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - int32_t code = initDataSource(numOfSources, pInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -5307,12 +5295,12 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock _error: if (pInfo != NULL) { - destroyExchangeOperatorInfo(pInfo, 0); + destroyExchangeOperatorInfo(pInfo, numOfSources); } tfree(pInfo); tfree(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -5351,7 +5339,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, tfree(pInfo); tfree(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return NULL; } @@ -5706,7 +5694,7 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) { } static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput); -static void clearupAggSup(SAggSupporter* pAggSup); +static void cleanupAggSup(SAggSupporter* pAggSup); static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*) param; @@ -5718,7 +5706,7 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { } blockDataDestroy(pInfo->binfo.pRes); - clearupAggSup(&pInfo->aggSup); + cleanupAggSup(&pInfo->aggSup); } static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { @@ -6170,7 +6158,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { tfree(pInfo); - + tfree(pOperator); terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; return NULL; } @@ -7132,7 +7120,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t n return TSDB_CODE_SUCCESS; } -static void clearupAggSup(SAggSupporter* pAggSup) { +static void cleanupAggSup(SAggSupporter* pAggSup) { tfree(pAggSup->keyBuf); taosHashCleanup(pAggSup->pResultRowHashTable); taosHashCleanup(pAggSup->pResultRowListSet); @@ -7147,6 +7135,9 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SExprInfo* pExprInfo, int32_ doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols); pInfo->pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); + if (pInfo->pTableQueryInfo == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } int32_t index = 0; for(int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pGroupList); ++i) { @@ -7170,14 +7161,20 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SExprInfo* pExprInfo, int32_ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } - int32_t numOfRows = 1; //(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); + int32_t numOfRows = 1; + int32_t code = initAggInfo(pInfo, pExprInfo, numOfCols, numOfRows, pResultBlock, pTableGroupInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } - initAggInfo(pInfo, pExprInfo, numOfCols, numOfRows, pResultBlock, pTableGroupInfo); setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG; pOperator->blockingOptr = true; @@ -7190,9 +7187,19 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->_openFn = doOpenAggregateOptr; pOperator->getNextFn = getAggregateResult; pOperator->closeFn = destroyAggOperatorInfo; - int32_t code = appendDownstream(pOperator, &downstream, 1); + + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } return pOperator; + _error: + destroyAggOperatorInfo(pInfo, numOfCols); + tfree(pInfo); + tfree(pOperator); + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + return NULL; } static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { @@ -7205,33 +7212,42 @@ static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } -static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { +void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { SOptrBasicInfo* pInfo = (SOptrBasicInfo*) param; doDestroyBasicInfo(pInfo, numOfOutput); } -static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) { + +void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) { SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); tfree(pInfo->prevData); } -static void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { + +void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); } -static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { +void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) { + STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) param; + doDestroyBasicInfo(&pInfo->binfo, numOfOutput); + cleanupAggSup(&pInfo->aggSup); + destroyDiskbasedBuf(pInfo->pResultBuf); +} + +void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); } -static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { +void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); pInfo->pRes = blockDataDestroy(pInfo->pRes); tfree(pInfo->p); } -static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { +void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); tfree(pInfo->prevData); @@ -7316,14 +7332,20 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprI SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo)); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } pInfo->binfo.pRes = pResBlock; pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset); + if (pInfo->binfo.pCtx == NULL) { + goto _error; + } // initResultRowInfo(&pBInfo->resultRowInfo, 8); // setFunctionResultOutput(pBInfo, MAIN_SCAN); - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "ProjectOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->blockingOptr = false; @@ -7338,8 +7360,15 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p pOperator->pTaskInfo = pTaskInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_OUT_OF_MEMORY) { + goto _error; + } return pOperator; + + _error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + return NULL; } SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) { @@ -7375,43 +7404,51 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3 return 0; } -SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfDownstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) { + ASSERT(numOfDownstream == 1); SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); - pInfo->limit = *pLimit; - pInfo->currentOffset = pLimit->offset; - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + pInfo->limit = *pLimit; + pInfo->currentOffset = pLimit->offset; pOperator->name = "LimitOperator"; // pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LIMIT; pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; + pOperator->_openFn = operatorDummyOpenFn; pOperator->getNextFn = doLimit; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; + _error: + tfree(pInfo); + tfree(pOperator); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; } SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SInterval* pInterval, SExecTaskInfo* pTaskInfo) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); - - doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pInfo->order = TSDB_ORDER_ASC; pInfo->precision = TSDB_TIME_PRECISION_MICRO; pInfo->win = pTaskInfo->window; pInfo->interval = *pInterval; - int32_t code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/"); + int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols); + code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/"); pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "TimeIntervalAggOperator"; // pOperator->operatorType = OP_TimeWindow; pOperator->blockingOptr = true; @@ -7421,11 +7458,19 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->pTaskInfo = pTaskInfo; pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->getNextFn = doIntervalAgg; - pOperator->closeFn = destroyBasicOperatorInfo; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = doIntervalAgg; + pOperator->closeFn = destroyIntervalOperatorInfo; code = appendDownstream(pOperator, &downstream, 1); return pOperator; + + _error: + destroyIntervalOperatorInfo(pInfo, numOfCols); + tfree(pInfo); + tfree(pOperator); + pTaskInfo->code = code; + return NULL; } SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { @@ -7478,15 +7523,21 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { SSessionAggOperatorInfo* pInfo = calloc(1, sizeof(SSessionAggOperatorInfo)); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } - doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols); - pInfo->binfo.pRes = pResBlock; initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - pInfo->prevTs = INT64_MIN; - pInfo->reptScan = false; - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - + pInfo->binfo.pRes = pResBlock; + pInfo->prevTs = INT64_MIN; + pInfo->reptScan = false; pOperator->name = "SessionWindowAggOperator"; // pOperator->operatorType = OP_SessionWindow; pOperator->blockingOptr = true; @@ -7499,8 +7550,18 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo pOperator->closeFn = destroySWindowOperatorInfo; pOperator->pTaskInfo = pTaskInfo; - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); return pOperator; + + _error: + if (pInfo != NULL) { + destroySWindowOperatorInfo(pInfo, numOfCols); + } + + tfree(pInfo); + tfree(pOperator); + pTaskInfo->code = code; + return NULL; } SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {