From ce1b294c51213e76212d9ca49b11e8309a64e253 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 10 Aug 2023 11:29:14 +0800 Subject: [PATCH] fix: memory leak issues --- source/libs/executor/inc/executorInt.h | 13 + source/libs/executor/inc/groupcache.h | 5 +- source/libs/executor/inc/operator.h | 7 +- .../libs/executor/src/dynqueryctrloperator.c | 455 ++++++++++-------- source/libs/executor/src/exchangeoperator.c | 11 +- source/libs/executor/src/executor.c | 4 - source/libs/executor/src/executorInt.c | 137 ++++++ source/libs/executor/src/groupcacheoperator.c | 89 +++- source/libs/executor/src/mergejoinoperator.c | 13 +- source/libs/executor/src/operator.c | 35 +- source/libs/executor/src/scanoperator.c | 1 + source/libs/planner/src/planPhysiCreater.c | 2 + 12 files changed, 520 insertions(+), 252 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 143859111b..401b532a69 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -48,6 +48,14 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int typedef struct STsdbReader STsdbReader; typedef struct STqReader STqReader; + +typedef enum SOperatorParamType{ + OP_GET_PARAM = 1, + OP_NOTIFY_PARAM +} SOperatorParamType; + + + #define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0) #define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN) #define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0) @@ -741,6 +749,11 @@ void streamOpReloadState(struct SOperatorInfo* pOperator); void destroyOperatorParamValue(void* pValues); int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc); int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq); +void freeExchangeGetBasicOperatorParam(void* pParam); +void freeOperatorParam(SOperatorParam* pParam, SOperatorParamType type); +void freeResetOperatorParams(struct SOperatorInfo* pOperator, SOperatorParamType type, bool allFree); +SSDataBlock* getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam); + #ifdef __cplusplus } diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index d86388a0b4..ef2730086b 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -56,7 +56,7 @@ typedef struct SGcDownstreamCtx { SRWLatch grpLock; int64_t fetchSessionId; SArray* pNewGrpList; // SArray - SSHashObj* pVgTbHash; + SSHashObj* pVgTbHash; // SHash SHashObj* pGrpHash; SRWLatch blkLock; SSDataBlock* pBaseBlock; @@ -136,7 +136,6 @@ typedef struct SGcBlkBufInfo { } SGcBlkBufInfo; typedef struct SGcExecInfo { - int32_t downstreamNum; int64_t* pDownstreamBlkNum; } SGcExecInfo; @@ -157,13 +156,13 @@ typedef struct SGcBlkCacheInfo { } SGcBlkCacheInfo; typedef struct SGroupCacheOperatorInfo { - TdThreadMutex sessionMutex; int64_t maxCacheSize; int64_t currentBlkId; SGroupColsInfo groupColsInfo; bool globalGrp; bool grpByUid; bool batchFetch; + int32_t downstreamNum; SGcDownstreamCtx* pDownstreams; SGcBlkCacheInfo blkCache; SHashObj* pGrpHash; diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 8b13d4d222..9a07a643a5 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -20,11 +20,6 @@ extern "C" { #endif -typedef enum SOperatorParamType{ - OP_GET_PARAM = 1, - OP_NOTIFY_PARAM -} SOperatorParamType; - typedef struct SOperatorCostInfo { double openCost; double totalCost; @@ -180,7 +175,7 @@ int32_t optrDefaultBufFn(SOperatorInfo* pOperator); SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam); int32_t optrDefaultNotifyFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam); SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx); -SSDataBlock* getNextBlockFromDownstreamOnce(struct SOperatorInfo* pOperator, int32_t idx); +SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx); int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx); SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 404b0201e4..fa332222dd 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -33,224 +33,64 @@ void freeVgTableList(void* ptr) { taosArrayDestroy(*(SArray**)ptr); } +static void destroyStbJoinTableList(SStbJoinTableList* pListHead) { + SStbJoinTableList* pNext = NULL; + + while (pListHead) { + taosMemoryFree(pListHead->pLeftVg); + taosMemoryFree(pListHead->pLeftUid); + taosMemoryFree(pListHead->pRightVg); + taosMemoryFree(pListHead->pRightUid); + pNext = pListHead->pNext; + taosMemoryFree(pListHead); + pListHead = pNext; + } +} + +static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) { + qError("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, + pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, + pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum); + + if (pStbJoin->basic.batchFetch) { + if (pStbJoin->ctx.prev.leftHash) { + tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList); + tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash); + } + if (pStbJoin->ctx.prev.rightHash) { + tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList); + tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash); + } + } else { + if (pStbJoin->ctx.prev.leftCache) { + tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache); + } + if (pStbJoin->ctx.prev.rightCache) { + tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache); + } + if (pStbJoin->ctx.prev.onceTable) { + tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable); + } + } + + destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead); +} static void destroyDynQueryCtrlOperator(void* param) { SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param; - qError("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, - pDyn->stbJoin.execInfo.prevBlkNum, pDyn->stbJoin.execInfo.prevBlkRows, pDyn->stbJoin.execInfo.postBlkNum, - pDyn->stbJoin.execInfo.postBlkRows, pDyn->stbJoin.execInfo.leftCacheNum, pDyn->stbJoin.execInfo.rightCacheNum); - if (pDyn->stbJoin.basic.batchFetch) { - if (pDyn->stbJoin.ctx.prev.leftHash) { - tSimpleHashSetFreeFp(pDyn->stbJoin.ctx.prev.leftHash, freeVgTableList); - tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.leftHash); - } - if (pDyn->stbJoin.ctx.prev.rightHash) { - tSimpleHashSetFreeFp(pDyn->stbJoin.ctx.prev.rightHash, freeVgTableList); - tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.rightHash); - } - } else { - if (pDyn->stbJoin.ctx.prev.leftCache) { - tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.leftCache); - } - if (pDyn->stbJoin.ctx.prev.rightCache) { - tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.rightCache); - } - if (pDyn->stbJoin.ctx.prev.onceTable) { - tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.onceTable); - } + switch (pDyn->qType) { + case DYN_QTYPE_STB_HASH: + destroyStbJoinDynCtrlInfo(&pDyn->stbJoin); + break; + default: + qError("unsupported dynamic query ctrl type: %d", pDyn->qType); + break; } taosMemoryFreeClear(param); } -static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) { - *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); - if (NULL == *ppRes) { - return TSDB_CODE_OUT_OF_MEMORY; - } - if (pChild) { - (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES); - if (NULL == *ppRes) { - return TSDB_CODE_OUT_OF_MEMORY; - } - if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) { - return TSDB_CODE_OUT_OF_MEMORY; - } - } else { - (*ppRes)->pChildren = NULL; - } - - SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam)); - if (NULL == pGc) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1); - pGc->downstreamIdx = downstreamIdx; - pGc->vgId = vgId; - pGc->tbUid = tbUid; - pGc->needCache = needCache; - - (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE; - (*ppRes)->downstreamIdx = downstreamIdx; - (*ppRes)->value = pGc; - - return TSDB_CODE_SUCCESS; -} - - -static FORCE_INLINE int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) { - *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); - if (NULL == *ppRes) { - return TSDB_CODE_OUT_OF_MEMORY; - } - (*ppRes)->pChildren = NULL; - - SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam)); - if (NULL == pGc) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pGc->downstreamIdx = downstreamIdx; - pGc->vgId = vgId; - pGc->tbUid = tbUid; - - (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE; - (*ppRes)->downstreamIdx = downstreamIdx; - (*ppRes)->value = pGc; - - return TSDB_CODE_SUCCESS; -} - - -static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) { - *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); - if (NULL == *ppRes) { - return TSDB_CODE_OUT_OF_MEMORY; - } - (*ppRes)->pChildren = NULL; - - SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam)); - if (NULL == pExc) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pExc->multiParams = false; - pExc->basic.vgId = *pVgId; - pExc->basic.tableSeq = true; - pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; - pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t)); - if (NULL == pExc->basic.uidList) { - taosMemoryFree(pExc); - return TSDB_CODE_OUT_OF_MEMORY; - } - taosArrayPush(pExc->basic.uidList, pUid); - - (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; - (*ppRes)->downstreamIdx = downstreamIdx; - (*ppRes)->value = pExc; - - return TSDB_CODE_SUCCESS; -} - - -static FORCE_INLINE int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) { - *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); - if (NULL == *ppRes) { - return TSDB_CODE_OUT_OF_MEMORY; - } - (*ppRes)->pChildren = NULL; - - SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam)); - if (NULL == pExc) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pExc->multiParams = true; - pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); - if (NULL == pExc->pBatchs) { - taosMemoryFree(pExc); - return TSDB_CODE_OUT_OF_MEMORY; - } - - SExchangeOperatorBasicParam basic; - basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; - - int32_t iter = 0; - void* p = NULL; - while (p = tSimpleHashIterate(pVg, p, &iter)) { - int32_t* pVgId = tSimpleHashGetKey(p, NULL); - SArray* pUidList = *(SArray**)p; - basic.vgId = *pVgId; - basic.uidList = pUidList; - basic.tableSeq = false; - - tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)); - - qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList)); - } - - (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; - (*ppRes)->downstreamIdx = downstreamIdx; - (*ppRes)->value = pExc; - - return TSDB_CODE_SUCCESS; -} - - -static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam* pChild0, SOperatorParam* pChild1) { - *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); - if (NULL == *ppRes) { - return TSDB_CODE_OUT_OF_MEMORY; - } - (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES); - if (NULL == *ppRes) { - return TSDB_CODE_OUT_OF_MEMORY; - } - if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) { - return TSDB_CODE_OUT_OF_MEMORY; - } - if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam)); - if (NULL == pJoin) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pJoin->initDownstreamNum = initParam ? 2 : 0; - - (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; - (*ppRes)->value = pJoin; - - return TSDB_CODE_SUCCESS; -} - - -static FORCE_INLINE int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) { - *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); - if (NULL == *ppRes) { - return TSDB_CODE_OUT_OF_MEMORY; - } - (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES); - if (NULL == *ppRes) { - return TSDB_CODE_OUT_OF_MEMORY; - } - if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) { - return TSDB_CODE_OUT_OF_MEMORY; - } - if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; - (*ppRes)->value = NULL; - - return TSDB_CODE_SUCCESS; -} - static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) { if (batchFetch) { return true; @@ -307,7 +147,199 @@ static void updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo* pStbJoin) } } -static FORCE_INLINE int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) { + +static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) { + *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (pChild) { + (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } else { + (*ppRes)->pChildren = NULL; + } + + SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam)); + if (NULL == pGc) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1); + pGc->downstreamIdx = downstreamIdx; + pGc->vgId = vgId; + pGc->tbUid = tbUid; + pGc->needCache = needCache; + + (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE; + (*ppRes)->downstreamIdx = downstreamIdx; + (*ppRes)->value = pGc; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) { + *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*ppRes)->pChildren = NULL; + + SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam)); + if (NULL == pGc) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pGc->downstreamIdx = downstreamIdx; + pGc->vgId = vgId; + pGc->tbUid = tbUid; + + (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE; + (*ppRes)->downstreamIdx = downstreamIdx; + (*ppRes)->value = pGc; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) { + *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*ppRes)->pChildren = NULL; + + SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam)); + if (NULL == pExc) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pExc->multiParams = false; + pExc->basic.vgId = *pVgId; + pExc->basic.tableSeq = true; + pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t)); + if (NULL == pExc->basic.uidList) { + taosMemoryFree(pExc); + return TSDB_CODE_OUT_OF_MEMORY; + } + taosArrayPush(pExc->basic.uidList, pUid); + + (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; + (*ppRes)->downstreamIdx = downstreamIdx; + (*ppRes)->value = pExc; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) { + *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*ppRes)->pChildren = NULL; + + SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam)); + if (NULL == pExc) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pExc->multiParams = true; + pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + if (NULL == pExc->pBatchs) { + taosMemoryFree(pExc); + return TSDB_CODE_OUT_OF_MEMORY; + } + tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam); + + SExchangeOperatorBasicParam basic; + basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + + int32_t iter = 0; + void* p = NULL; + while (p = tSimpleHashIterate(pVg, p, &iter)) { + int32_t* pVgId = tSimpleHashGetKey(p, NULL); + SArray* pUidList = *(SArray**)p; + basic.vgId = *pVgId; + basic.uidList = pUidList; + basic.tableSeq = false; + + tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)); + + qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList)); + *(SArray**)p = NULL; + } + + (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; + (*ppRes)->downstreamIdx = downstreamIdx; + (*ppRes)->value = pExc; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam* pChild0, SOperatorParam* pChild1) { + *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam)); + if (NULL == pJoin) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pJoin->initDownstreamNum = initParam ? 2 : 0; + + (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; + (*ppRes)->value = pJoin; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) { + *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; + (*ppRes)->value = NULL; + + return TSDB_CODE_SUCCESS; +} + + + +static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) { int32_t code = TSDB_CODE_SUCCESS; int32_t vgNum = tSimpleHashGetSize(pVg); if (vgNum <= 0 || vgNum > 1) { @@ -325,13 +357,14 @@ static FORCE_INLINE int32_t buildBatchTableScanOperatorParam(SOperatorParam** pp if (code) { return code; } + *(SArray**)p = NULL; } return TSDB_CODE_SUCCESS; } -static FORCE_INLINE int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) { +static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) { SArray* pUidList = taosArrayInit(1, sizeof(int64_t)); if (NULL == pUidList) { return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index d8756accab..c1bd01d2a3 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -382,7 +382,8 @@ void doDestroyExchangeOperatorInfo(void* param) { taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock); blockDataDestroy(pExInfo->pDummyBlock); - + tSimpleHashCleanup(pExInfo->pHashSources); + tsem_destroy(&pExInfo->ready); taosMemoryFreeClear(param); } @@ -438,11 +439,14 @@ int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, in STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam)); if (NULL == pScan) { + taosMemoryFreeClear(*ppRes); return TSDB_CODE_OUT_OF_MEMORY; } pScan->pUidList = taosArrayDup(pUidList, NULL); if (NULL == pScan->pUidList) { + taosMemoryFree(pScan); + taosMemoryFreeClear(*ppRes); return TSDB_CODE_OUT_OF_MEMORY; } pScan->tableSeq = tableSeq; @@ -500,6 +504,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas if (msgSize < 0) { pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pWrapper); + freeOperatorParam(req.pOpParam, OP_GET_PARAM); return pTaskInfo->code; } @@ -507,6 +512,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas if (NULL == msg) { pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pWrapper); + freeOperatorParam(req.pOpParam, OP_GET_PARAM); return pTaskInfo->code; } @@ -514,9 +520,12 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pWrapper); taosMemoryFree(msg); + freeOperatorParam(req.pOpParam, OP_GET_PARAM); return pTaskInfo->code; } + freeOperatorParam(req.pOpParam, OP_GET_PARAM); + qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %p, %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId, pExchangeInfo, sourceIndex, totalSources); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fb63a95a0d..9bb24e7a7d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -502,10 +502,6 @@ bool qIsDynamicExecTask(qTaskInfo_t tinfo) { return ((SExecTaskInfo*)tinfo)->dynamicTask; } -void destroyOperatorParamValue(void* pValues) { - -} - void destroyOperatorParam(SOperatorParam* pParam) { if (NULL == pParam) { return; diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index ebec9aa94e..653386063e 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -1070,3 +1070,140 @@ void streamOpReloadState(SOperatorInfo* pOperator) { downstream->fpSet.reloadStreamStateFn(downstream); } } + +void freeOperatorParamImpl(SOperatorParam* pParam, SOperatorParamType type) { + int32_t childrenNum = taosArrayGetSize(pParam->pChildren); + for (int32_t i = 0; i < childrenNum; ++i) { + SOperatorParam* pChild = taosArrayGetP(pParam->pChildren, i); + freeOperatorParam(pChild, type); + } + + taosArrayDestroy(pParam->pChildren); + + taosMemoryFree(pParam->value); + + taosMemoryFree(pParam); +} + +void freeExchangeGetBasicOperatorParam(void* pParam) { + SExchangeOperatorBasicParam* pBasic = (SExchangeOperatorBasicParam*)pParam; + taosArrayDestroy(pBasic->uidList); +} + +void freeExchangeGetOperatorParam(SOperatorParam* pParam) { + SExchangeOperatorParam* pExcParam = (SExchangeOperatorParam*)pParam->value; + if (pExcParam->multiParams) { + SExchangeOperatorBatchParam* pExcBatch = (SExchangeOperatorBatchParam*)pParam->value; + tSimpleHashCleanup(pExcBatch->pBatchs); + } else { + freeExchangeGetBasicOperatorParam(&pExcParam->basic); + } + + freeOperatorParamImpl(pParam, OP_GET_PARAM); +} + +void freeExchangeNotifyOperatorParam(SOperatorParam* pParam) { + freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); +} + +void freeGroupCacheGetOperatorParam(SOperatorParam* pParam) { + freeOperatorParamImpl(pParam, OP_GET_PARAM); +} + +void freeGroupCacheNotifyOperatorParam(SOperatorParam* pParam) { + freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); +} + +void freeMergeJoinGetOperatorParam(SOperatorParam* pParam) { + freeOperatorParamImpl(pParam, OP_GET_PARAM); +} + +void freeMergeJoinNotifyOperatorParam(SOperatorParam* pParam) { + freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); +} + +void freeTableScanGetOperatorParam(SOperatorParam* pParam) { + STableScanOperatorParam* pTableScanParam = (STableScanOperatorParam*)pParam->value; + taosArrayDestroy(pTableScanParam->pUidList); + freeOperatorParamImpl(pParam, OP_GET_PARAM); +} + +void freeTableScanNotifyOperatorParam(SOperatorParam* pParam) { + freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); +} + + +void freeOperatorParam(SOperatorParam* pParam, SOperatorParamType type) { + if (NULL == pParam) { + return; + } + + switch (pParam->opType) { + case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: + type == OP_GET_PARAM ? freeExchangeGetOperatorParam(pParam) : freeExchangeNotifyOperatorParam(pParam); + break; + case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE: + type == OP_GET_PARAM ? freeGroupCacheGetOperatorParam(pParam) : freeGroupCacheNotifyOperatorParam(pParam); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: + type == OP_GET_PARAM ? freeMergeJoinGetOperatorParam(pParam) : freeMergeJoinNotifyOperatorParam(pParam); + break; + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: + type == OP_GET_PARAM ? freeTableScanGetOperatorParam(pParam) : freeTableScanNotifyOperatorParam(pParam); + break; + default: + qError("unsupported op %d param, type %d", pParam->opType, type); + break; + } +} + +void freeResetOperatorParams(struct SOperatorInfo* pOperator, SOperatorParamType type, bool allFree) { + SOperatorParam** ppParam = NULL; + SOperatorParam*** pppDownstramParam = NULL; + switch (type) { + case OP_GET_PARAM: + ppParam = &pOperator->pOperatorGetParam; + pppDownstramParam = &pOperator->pDownstreamGetParams; + break; + case OP_NOTIFY_PARAM: + ppParam = &pOperator->pOperatorNotifyParam; + pppDownstramParam = &pOperator->pDownstreamNotifyParams; + break; + default: + return; + } + + if (*ppParam) { + freeOperatorParam(*ppParam, type); + *ppParam = NULL; + } + + if (*pppDownstramParam) { + for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { + if ((*pppDownstramParam)[i]) { + freeOperatorParam((*pppDownstramParam)[i], type); + (*pppDownstramParam)[i] = NULL; + } + } + if (allFree) { + taosMemoryFreeClear(*pppDownstramParam); + } + } +} + + +FORCE_INLINE SSDataBlock* getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam) { + if (pOperator->pDownstreamGetParams && pOperator->pDownstreamGetParams[idx]) { + qDebug("DynOp: op %s start to get block from downstream %s", pOperator->name, pOperator->pDownstream[idx]->name); + SSDataBlock* pBlock = pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pOperator->pDownstreamGetParams[idx]); + if (clearParam) { + freeOperatorParam(pOperator->pDownstreamGetParams[idx], OP_GET_PARAM); + pOperator->pDownstreamGetParams[idx] = NULL; + } + return pBlock; + } + + return pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx]); +} + + diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index e904cc7ab8..6bbc1cb327 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -73,15 +73,85 @@ static int32_t initGroupColsInfo(SGroupColsInfo* pCols, bool grpColsMayBeNull, S } static void logGroupCacheExecInfo(SGroupCacheOperatorInfo* pGrpCacheOperator) { - char* buf = taosMemoryMalloc(pGrpCacheOperator->execInfo.downstreamNum * 32 + 100); + char* buf = taosMemoryMalloc(pGrpCacheOperator->downstreamNum * 32 + 100); if (NULL == buf) { return; } int32_t offset = sprintf(buf, "groupCache exec info, downstreamBlkNum:"); - for (int32_t i = 0; i < pGrpCacheOperator->execInfo.downstreamNum; ++i) { + for (int32_t i = 0; i < pGrpCacheOperator->downstreamNum; ++i) { offset += sprintf(buf + offset, " %" PRId64 , pGrpCacheOperator->execInfo.pDownstreamBlkNum[i]); } qDebug("%s", buf); + taosMemoryFree(buf); +} + +static void freeSGcSessionCtx(void* p) { + SGcSessionCtx* pSession = p; + if (pSession->semInit) { + tsem_destroy(&pSession->waitSem); + } +} + +static void freeSGroupCacheFileInfo(void* p) { + SGroupCacheFileInfo* pFileInfo = p; + if (pFileInfo->deleted) { + return; + } + + removeGroupCacheFile(pFileInfo); +} + +static void freeSGcFileCacheCtx(SGcFileCacheCtx* pFileCtx) { + taosHashCleanup(pFileCtx->pCacheFile); +} + +static void freeSGcVgroupCtx(void* p) { + SGcVgroupCtx* pVgCtx = p; + taosArrayDestroy(pVgCtx->pTbList); + freeSGcFileCacheCtx(&pVgCtx->fileCtx); +} + +static void freeGcBlockInList(void* p) { + SSDataBlock** ppBlock = p; + if (*ppBlock) { + taosArrayDestroy((*ppBlock)->pDataBlock); + taosMemoryFree(*ppBlock); + } +} + +static void freeSGcDownstreamCtx(SGcDownstreamCtx* pCtx) { + taosArrayDestroy(pCtx->pNewGrpList); + tSimpleHashCleanup(pCtx->pVgTbHash); + taosHashCleanup(pCtx->pGrpHash); + + taosArrayDestroyEx(pCtx->pFreeBlock, freeGcBlockInList); + taosHashCleanup(pCtx->pSessions); + taosHashCleanup(pCtx->pWaitSessions); + freeSGcFileCacheCtx(&pCtx->fileCtx); +} + +static void destroyGroupCacheDownstreamCtx(SGroupCacheOperatorInfo* pGrpCacheOperator) { + if (NULL == pGrpCacheOperator->pDownstreams) { + return; + } + + for (int32_t i = 0; i < pGrpCacheOperator->downstreamNum; ++i) { + SGcDownstreamCtx* pCtx = &pGrpCacheOperator->pDownstreams[i]; + freeSGcDownstreamCtx(pCtx); + } + + taosMemoryFree(pGrpCacheOperator->pDownstreams); +} + +static void destroySGcBlkCacheInfo(SGcBlkCacheInfo* pBlkCache) { + taosHashCleanup(pBlkCache->pDirtyBlk); + + void* p = NULL; + while (p = taosHashIterate(pBlkCache->pReadBlk, p)) { + freeGcBlockInList(p); + } + + taosHashCleanup(pBlkCache->pReadBlk); } static void destroyGroupCacheOperator(void* param) { @@ -91,7 +161,12 @@ static void destroyGroupCacheOperator(void* param) { taosMemoryFree(pGrpCacheOperator->groupColsInfo.pColsInfo); taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf); + + destroyGroupCacheDownstreamCtx(pGrpCacheOperator); + destroySGcBlkCacheInfo(&pGrpCacheOperator->blkCache); taosHashCleanup(pGrpCacheOperator->pGrpHash); + + taosMemoryFree(pGrpCacheOperator->execInfo.pDownstreamBlkNum); taosMemoryFreeClear(param); } @@ -117,6 +192,7 @@ static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, S if (NULL == pFileCtx->pCacheFile) { return TSDB_CODE_OUT_OF_MEMORY; } + taosHashSetFreeFp(pFileCtx->pCacheFile, freeSGroupCacheFileInfo); } SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); @@ -622,6 +698,7 @@ static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int if (NULL == pFileCtx->pCacheFile) { return TSDB_CODE_OUT_OF_MEMORY; } + taosHashSetFreeFp(pFileCtx->pCacheFile, freeSGroupCacheFileInfo); } SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); @@ -1091,7 +1168,6 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock static int32_t initGroupCacheExecInfo(SOperatorInfo* pOperator) { SGroupCacheOperatorInfo* pInfo = pOperator->info; - pInfo->execInfo.downstreamNum = pOperator->numOfDownstream; pInfo->execInfo.pDownstreamBlkNum = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(int64_t)); if (NULL == pInfo->execInfo.pDownstreamBlkNum) { return TSDB_CODE_OUT_OF_MEMORY; @@ -1127,6 +1203,7 @@ static void freeRemoveGroupCacheData(void* p) { taosArrayDestroy(pGroup->waitQueue); taosArrayDestroy(pGroup->blkList.pList); + taosThreadMutexDestroy(&pGroup->mutex); qTrace("group removed"); } @@ -1139,6 +1216,7 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { if (NULL == pInfo->pDownstreams) { return TSDB_CODE_OUT_OF_MEMORY; } + pInfo->downstreamNum = pOperator->numOfDownstream; for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { SGcDownstreamCtx* pCtx = &pInfo->pDownstreams[i]; @@ -1149,6 +1227,8 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { if (NULL == pCtx->pVgTbHash) { return TSDB_CODE_OUT_OF_MEMORY; } + tSimpleHashSetFreeFp(pCtx->pVgTbHash, freeSGcVgroupCtx); + if (pInfo->batchFetch) { int32_t defaultVg = 0; SGcVgroupCtx vgCtx = {0}; @@ -1172,6 +1252,7 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { if (pCtx->pSessions == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + taosHashSetFreeFp(pCtx->pSessions, freeSGcSessionCtx); pCtx->pFreeBlock = taosArrayInit(10, POINTER_BYTES); if (NULL == pCtx->pFreeBlock) { @@ -1234,7 +1315,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pInfo->maxCacheSize = 1; + pInfo->maxCacheSize = 0; pInfo->grpByUid = pPhyciNode->grpByUid; pInfo->globalGrp = pPhyciNode->globalGrp; pInfo->batchFetch = pPhyciNode->batchFetch; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 8cdc74f623..16e0c8c1d2 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -301,10 +301,15 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->rightBuildTable = tSimpleHashInit(256, hashFn); } pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { goto _error; } + if (newDownstreams) { + taosMemoryFree(pDownstream); + } + pOperator->numOfRealDownstream = newDownstreams ? 1 : 2; return pOperator; @@ -449,7 +454,7 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks); while (endPos == dataBlock->info.rows) { SOperatorInfo* ds = pOperator->pDownstream[whichChild]; - dataBlock = getNextBlockFromDownstream(pOperator, whichChild); + dataBlock = getNextBlockFromDownstreamRemain(pOperator, whichChild); qError("merge join %s got block for same ts, rows:%" PRId64, whichChild == 0 ? "left" : "right", dataBlock ? dataBlock->info.rows : 0); if (whichChild == 0) { pJoinInfo->leftPos = 0; @@ -648,6 +653,8 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t static void setMergeJoinDone(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; + freeOperatorParam(pOperator->pDownstreamGetParams[0], OP_GET_PARAM); + freeOperatorParam(pOperator->pDownstreamGetParams[1], OP_GET_PARAM); pOperator->pDownstreamGetParams[0] = NULL; pOperator->pDownstreamGetParams[1] = NULL; } @@ -658,7 +665,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { if (!pJoinInfo->downstreamFetchDone[0]) { - pJoinInfo->pLeft = getNextBlockFromDownstream(pOperator, 0); + pJoinInfo->pLeft = getNextBlockFromDownstreamRemain(pOperator, 0); pJoinInfo->leftPos = 0; qError("merge join left got block, rows:%" PRId64, pJoinInfo->pLeft ? pJoinInfo->pLeft->info.rows : 0); @@ -679,7 +686,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { if (!pJoinInfo->downstreamFetchDone[1]) { - pJoinInfo->pRight = getNextBlockFromDownstream(pOperator, 1); + pJoinInfo->pRight = getNextBlockFromDownstreamRemain(pOperator, 1); if (pOperator->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstreamNum > 0) { ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstreamNum--; diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index f3e989609a..cb01d2be32 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -551,11 +551,15 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR return pOptr; } + void destroyOperator(SOperatorInfo* pOperator) { if (pOperator == NULL) { return; } + freeResetOperatorParams(pOperator, OP_GET_PARAM, true); + freeResetOperatorParams(pOperator, OP_NOTIFY_PARAM, true); + if (pOperator->fpSet.closeFn != NULL) { pOperator->fpSet.closeFn(pOperator->info); } @@ -626,9 +630,12 @@ int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) { taosMemoryFree(pBatch); return TSDB_CODE_OUT_OF_MEMORY; } + tSimpleHashSetFreeFp(pBatch->pBatchs, freeExchangeGetBasicOperatorParam); + tSimpleHashPut(pBatch->pBatchs, &pDExc->basic.vgId, sizeof(pDExc->basic.vgId), &pDExc->basic, sizeof(pDExc->basic)); tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic, sizeof(pSExc->basic)); - destroyOperatorParamValue(pDst->value); + + taosMemoryFree(pDst->value); pDst->value = pBatch; } else { taosArrayAddAll(pDExc->basic.uidList, pSExc->basic.uidList); @@ -668,10 +675,10 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInpu default: return TSDB_CODE_INVALID_PARA; } + + freeResetOperatorParams(pOperator, type, false); if (NULL == pInput) { - *ppParam = NULL; - taosMemoryFreeClear(*pppDownstramParam); return TSDB_CODE_SUCCESS; } @@ -710,31 +717,19 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInpu } } - taosArrayClear((*ppParam)->pChildren); + taosArrayDestroy((*ppParam)->pChildren); + (*ppParam)->pChildren = NULL; return TSDB_CODE_SUCCESS; } -SSDataBlock* getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam) { - if (pOperator->pDownstreamGetParams && pOperator->pDownstreamGetParams[idx]) { - qDebug("DynOp: op %s start to get block from downstream %s", pOperator->name, pOperator->pDownstream[idx]->name); - SSDataBlock* pBlock = pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pOperator->pDownstreamGetParams[idx]); - if (clearParam) { - pOperator->pDownstreamGetParams[idx] = NULL; - } - return pBlock; - } - - return pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx]); -} - SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) { - return getNextBlockFromDownstreamImpl(pOperator, idx, false); + return getNextBlockFromDownstreamImpl(pOperator, idx, true); } -SSDataBlock* getNextBlockFromDownstreamOnce(struct SOperatorInfo* pOperator, int32_t idx) { - return getNextBlockFromDownstreamImpl(pOperator, idx, true); +SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) { + return getNextBlockFromDownstreamImpl(pOperator, idx, false); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index afa4b98ac9..1978ebe9f2 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -924,6 +924,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { if (pOperator->pOperatorGetParam) { pOperator->dynamicTask = true; int32_t code = createTableListInfoFromParam(pOperator); + freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM); pOperator->pOperatorGetParam = NULL; if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 0a6ac902ce..dfce643408 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -992,9 +992,11 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh pGrpCache->batchFetch = pLogicNode->batchFetch; SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc; int32_t code = TSDB_CODE_SUCCESS; +/* if (TSDB_CODE_SUCCESS == code) { code = setListSlotId(pCxt, pChildDesc->dataBlockId, -1, pLogicNode->pGroupCols, &pGrpCache->pGroupCols); } +*/ if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pGrpCache;