fix: free operator

This commit is contained in:
xsren 2024-08-29 19:43:59 +08:00
parent fec49abf33
commit d43df78537
23 changed files with 94 additions and 234 deletions

View File

@ -182,6 +182,7 @@ int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t
int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo);
void destroyOperator(SOperatorInfo* pOperator);
void destroyOperatorAndDownstreams(SOperatorInfo* pOperator, SOperatorInfo** stream, int32_t num);
int32_t extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id, SOperatorInfo** pOptrInfo);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder);

View File

@ -137,22 +137,14 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
}
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
if (pInfo != NULL) {
destroyAggOperatorInfo(pInfo);
}
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}

View File

@ -378,20 +378,14 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
}
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) {
destroyCountWindowOperatorInfo(pInfo);
}
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}

View File

@ -1006,14 +1006,14 @@ int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numO
NULL, optrDefaultGetNextExtFn, NULL);
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) {
destroyDynQueryCtrlOperator(pInfo);
}
taosMemoryFree(pOperator);
destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
pTaskInfo->code = code;
return code;
}

View File

@ -140,20 +140,14 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
}
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) {
destroyEWindowOperatorInfo(pInfo);
}
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}

View File

@ -444,7 +444,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (code != TSDB_CODE_SUCCESS) {

View File

@ -511,26 +511,21 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
if (pInfo != NULL) {
destroyFillOperatorInfo(pInfo);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
return code;
}

View File

@ -1510,20 +1510,14 @@ int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfD
qTrace("new group cache operator, maxCacheSize:%" PRId64 ", globalGrp:%d, batchFetch:%d", pInfo->maxCacheSize, pInfo->globalGrp, pInfo->batchFetch);
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) {
destroyGroupCacheOperator(pInfo);
}
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && pDownstream != NULL && (*pDownstream) != NULL) {
destroyOperator(*pDownstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
pTaskInfo->code = code;
return code;
}

View File

@ -600,22 +600,12 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo
QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) destroyGroupOperatorInfo(pInfo);
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
if (pInfo != NULL) {
destroyGroupOperatorInfo(pInfo);
}
if (pOperator) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
return code;
}
@ -1229,20 +1219,14 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
}
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) {
destroyPartitionOperatorInfo(pInfo);
}
pTaskInfo->code = code;
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
TAOS_RETURN(code);
}
@ -1778,18 +1762,12 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart
QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
pTaskInfo->code = code;
if (pInfo != NULL) destroyStreamPartitionOperatorInfo(pInfo);
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
}

View File

@ -1228,15 +1228,14 @@ int32_t createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDow
qDebug("create hash Join operator done");
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_return:
if (pInfo != NULL) {
destroyHashJoinOperator(pInfo);
}
taosMemoryFree(pOperator);
destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
pTaskInfo->code = code;
return code;
}

View File

@ -1869,6 +1869,7 @@ int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDo
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
int32_t oldNum = numOfDownstream;
bool newDownstreams = false;
int32_t code = TSDB_CODE_SUCCESS;
SOperatorInfo* pOperator = NULL;
@ -1921,8 +1922,7 @@ _return:
if (newDownstreams) {
taosMemoryFree(pDownstream);
}
taosMemoryFree(pOperator);
destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum);
pTaskInfo->code = code;
return code;

View File

@ -649,14 +649,13 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS
}
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) {
destroyMultiwayMergeOperatorInfo(pInfo);
}
pTaskInfo->code = code;
taosMemoryFree(pOperator);
destroyOperatorAndDownstreams(pOperator, downStreams, numStreams);
return code;
}

View File

@ -633,7 +633,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else {
code = TSDB_CODE_INVALID_PARA;
pTaskInfo->code = code;
for (int32_t i = 0; i < size; ++i) {
destroyOperator(ops[i]);
}
taosMemoryFree(ops);
qError("invalid operator type %d", type);
return code;
}
@ -672,6 +676,23 @@ void destroyOperator(SOperatorInfo* pOperator) {
taosMemoryFreeClear(pOperator);
}
void destroyOperatorAndDownstreams(SOperatorInfo* pOperator, SOperatorInfo** downstreams, int32_t num) {
if (downstreams != NULL) {
for (int i = 0; i < num; i++) {
destroyOperator(downstreams[i]);
}
}
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream != NULL) {
taosMemoryFreeClear(pOperator->pDownstream);
pOperator->pDownstream = NULL;
}
destroyOperator(pOperator);
}
}
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
SExplainExecInfo execInfo = {0};
SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);

View File

@ -177,17 +177,11 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode*
}
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) destroyProjectOperatorInfo(pInfo);
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}
@ -525,17 +519,11 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
}
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo);
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}

View File

@ -1528,7 +1528,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa
// for non-blocking operator, the open cost is always 0
pOperator->cost.openCost = 0;
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) {

View File

@ -155,20 +155,13 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
}
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) {
destroySortOperatorInfo(pInfo);
}
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}
@ -841,19 +834,13 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo
}
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
pTaskInfo->code = code;
if (pInfo != NULL) {
destroyGroupSortOperatorInfo(pInfo);
}
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
return code;
}

View File

@ -920,20 +920,14 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
}
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) {
destroyStreamCountAggOperatorInfo(pInfo);
}
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;

View File

@ -982,17 +982,11 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) destroyStreamEventOperatorInfo(pInfo);
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;

View File

@ -1451,20 +1451,13 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
}
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo);
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}

View File

@ -2024,17 +2024,11 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
}
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo);
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}
@ -3858,20 +3852,13 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
}
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) {
destroyStreamSessionAggOperatorInfo(pInfo);
}
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
@ -4074,6 +4061,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys
SOperatorInfo* pOperator = NULL;
code = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle, &pOperator);
if (pOperator == NULL || code != 0) {
downstream = NULL;
QUERY_CHECK_CODE(code, lino, _error);
}
@ -4135,9 +4123,6 @@ _error:
}
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code;
@ -5035,17 +5020,11 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) destroyStreamStateOperatorInfo(pInfo);
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
@ -5376,17 +5355,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo);
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}

View File

@ -1192,22 +1192,17 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN
// int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
code = appendDownstream(pOperator, &downstream, 1);
QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo);
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}

View File

@ -1428,19 +1428,13 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
}
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) {
destroyIntervalOperatorInfo(pInfo);
}
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}
@ -1703,20 +1697,14 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy
}
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) {
destroyStateWindowOperatorInfo(pInfo);
}
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}
@ -1805,17 +1793,11 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}
@ -2122,17 +2104,11 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}
@ -2462,19 +2438,12 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva
}
*pOptrInfo = pOperator;
return code;
return TSDB_CODE_SUCCESS;
_error:
if (pMergeIntervalInfo != NULL) {
destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
}
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
return code;
}

View File

@ -5221,7 +5221,7 @@ int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pInfo, uint32_t options)
FLT_ERR_JRET(fltOptimizeNodes(info, &info->sclCtx.node, &stat));
}
return code;
return TSDB_CODE_SUCCESS;
_return: