diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 7d09be3300..0df676c6e2 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -182,6 +182,7 @@ int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo); void destroyOperator(SOperatorInfo* pOperator); +void destroyOperatorAndDownstreams(SOperatorInfo* pOperator, SOperatorInfo** stream, int32_t num); int32_t extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id, SOperatorInfo** pOptrInfo); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index fe82e0eb62..f0e0f81cf5 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -137,22 +137,14 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - } + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); if (pInfo != NULL) { destroyAggOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 1d72b0bb58..28b2c22053 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -378,20 +378,14 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyCountWindowOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index a75bfb8f4b..8058fa9afe 100644 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -1006,14 +1006,14 @@ int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numO NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyDynQueryCtrlOperator(pInfo); } - taosMemoryFree(pOperator); + destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 0b5fd074b0..591590a261 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -140,20 +140,14 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyEWindowOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 4315624d97..120dcbc205 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -444,7 +444,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 4ae3226f48..246a5e2a6d 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -511,26 +511,21 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - } + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + if (pInfo != NULL) { destroyFillOperatorInfo(pInfo); } - + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } return code; } diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 9b213487ed..f0e0894bd2 100644 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -1510,20 +1510,14 @@ int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfD qTrace("new group cache operator, maxCacheSize:%" PRId64 ", globalGrp:%d, batchFetch:%d", pInfo->maxCacheSize, pInfo->globalGrp, pInfo->batchFetch); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyGroupCacheOperator(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && pDownstream != NULL && (*pDownstream) != NULL) { - destroyOperator(*pDownstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 76d592a9b5..9b46db609f 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -600,22 +600,12 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: + if (pInfo != NULL) destroyGroupOperatorInfo(pInfo); + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; - if (pInfo != NULL) { - destroyGroupOperatorInfo(pInfo); - } - - if (pOperator) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } - return code; } @@ -1234,20 +1224,14 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyPartitionOperatorInfo(pInfo); } pTaskInfo->code = code; - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); TAOS_RETURN(code); } @@ -1783,18 +1767,12 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: pTaskInfo->code = code; if (pInfo != NULL) destroyStreamPartitionOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; } diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 55620defba..f253aefe95 100644 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -1228,15 +1228,14 @@ int32_t createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDow qDebug("create hash Join operator done"); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _return: if (pInfo != NULL) { destroyHashJoinOperator(pInfo); } - - taosMemoryFree(pOperator); + destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 808aac66c2..14f3a08e17 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1869,6 +1869,7 @@ int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDo SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); + int32_t oldNum = numOfDownstream; bool newDownstreams = false; int32_t code = TSDB_CODE_SUCCESS; SOperatorInfo* pOperator = NULL; @@ -1921,8 +1922,7 @@ _return: if (newDownstreams) { taosMemoryFree(pDownstream); } - - taosMemoryFree(pOperator); + destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum); pTaskInfo->code = code; return code; diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 9e0ad5f497..c12bfd8798 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -649,14 +649,13 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyMultiwayMergeOperatorInfo(pInfo); } - pTaskInfo->code = code; - taosMemoryFree(pOperator); + destroyOperatorAndDownstreams(pOperator, downStreams, numStreams); return code; } diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index fc52b97388..3f48d0f0a8 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -633,7 +633,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else { code = TSDB_CODE_INVALID_PARA; pTaskInfo->code = code; + for (int32_t i = 0; i < size; ++i) { + destroyOperator(ops[i]); + } taosMemoryFree(ops); + qError("invalid operator type %d", type); return code; } @@ -672,6 +676,23 @@ void destroyOperator(SOperatorInfo* pOperator) { taosMemoryFreeClear(pOperator); } +void destroyOperatorAndDownstreams(SOperatorInfo* pOperator, SOperatorInfo** downstreams, int32_t num) { + if (downstreams != NULL) { + for (int i = 0; i < num; i++) { + destroyOperator(downstreams[i]); + } + } + + if (pOperator != NULL) { + pOperator->info = NULL; + if (pOperator->pDownstream != NULL) { + taosMemoryFreeClear(pOperator->pDownstream); + pOperator->pDownstream = NULL; + } + destroyOperator(pOperator); + } +} + int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) { SExplainExecInfo execInfo = {0}; SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 7e06c083ed..a9ba57e1d4 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -177,17 +177,11 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyProjectOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -525,17 +519,11 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 557794a062..b63fe1198d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1528,7 +1528,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa // for non-blocking operator, the open cost is always 0 pOperator->cost.openCost = 0; *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index fb4b61c7a8..6083cbdcf8 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -155,20 +155,13 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroySortOperatorInfo(pInfo); } - - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -841,19 +834,13 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: pTaskInfo->code = code; if (pInfo != NULL) { destroyGroupSortOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); return code; } diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 54ad12cff0..4f11afd35a 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -920,20 +920,14 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyStreamCountAggOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index a8e14bce68..d7519d90e9 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -982,17 +982,11 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyStreamEventOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 3a6d0c709c..9a66f6d688 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1451,20 +1451,13 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); - } + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 6a1a5942d6..8d5aa7104f 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2041,17 +2041,11 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -3875,20 +3869,13 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyStreamSessionAggOperatorInfo(pInfo); } - - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; @@ -4091,6 +4078,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys SOperatorInfo* pOperator = NULL; code = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle, &pOperator); if (pOperator == NULL || code != 0) { + downstream = NULL; QUERY_CHECK_CODE(code, lino, _error); } @@ -4152,9 +4140,6 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -5052,17 +5037,11 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyStreamStateOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; @@ -5398,17 +5377,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 1b8e6709d1..d57a8c7c5b 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1192,22 +1192,17 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN // int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp); code = appendDownstream(pOperator, &downstream, 1); + QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 024e0393f0..5499fa3026 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1428,19 +1428,13 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyIntervalOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -1703,20 +1697,14 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyStateWindowOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -1805,17 +1793,11 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroySWindowOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -2122,17 +2104,11 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -2462,19 +2438,12 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pMergeIntervalInfo != NULL) { destroyMergeIntervalOperatorInfo(pMergeIntervalInfo); } - - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index cbc7ca77bb..f5712d135b 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -5221,7 +5221,7 @@ int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pInfo, uint32_t options) FLT_ERR_JRET(fltOptimizeNodes(info, &info->sclCtx.node, &stat)); } - return code; + return TSDB_CODE_SUCCESS; _return: