diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c9d0eebe2b..d561d86472 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -297,7 +297,6 @@ typedef struct SPartitionBySupporter { typedef struct SPartitionDataInfo { uint64_t groupId; char* tbname; - SArray* tags; SArray* rowIds; } SPartitionDataInfo; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 280edf8b53..e2ba2a07c8 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1215,6 +1215,11 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) { return pBlock; } +void freePartItem(void* ptr) { + SPartitionDataInfo* pPart = (SPartitionDataInfo*)ptr; + taosArrayDestroy(pPart->rowIds); +} + SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -1293,6 +1298,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + taosHashSetFreeFp(pInfo->pPartitions, freePartItem); pInfo->tsColIndex = 0; pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 0fb78fb589..918a203df4 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -149,7 +149,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t batchCnt = 0; while (1) { if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return 0; } @@ -203,6 +203,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { code = streamTaskOutput(pTask, qRes); if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { taosFreeQitem(pRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return code; }