fix(stream):mem leak
This commit is contained in:
parent
cda0087d40
commit
cdaa5f9801
|
@ -944,6 +944,7 @@ int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
|
||||||
void resetUnCloseSessionWinInfo(SSHashObj* winMap);
|
void resetUnCloseSessionWinInfo(SSHashObj* winMap);
|
||||||
void setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
|
void setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
|
||||||
void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup);
|
void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup);
|
||||||
|
void destroyFlusedPos(void* pRes);
|
||||||
|
|
||||||
int32_t encodeSSessionKey(void** buf, SSessionKey* key);
|
int32_t encodeSSessionKey(void** buf, SSessionKey* key);
|
||||||
void* decodeSSessionKey(void* buf, SSessionKey* key);
|
void* decodeSSessionKey(void* buf, SSessionKey* key);
|
||||||
|
|
|
@ -50,12 +50,13 @@ void destroyStreamCountAggOperatorInfo(void* param) {
|
||||||
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
||||||
cleanupExprSupp(&pInfo->scalarSupp);
|
cleanupExprSupp(&pInfo->scalarSupp);
|
||||||
clearGroupResInfo(&pInfo->groupResInfo);
|
clearGroupResInfo(&pInfo->groupResInfo);
|
||||||
|
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
|
||||||
|
pInfo->pUpdated = NULL;
|
||||||
|
|
||||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||||
blockDataDestroy(pInfo->pDelRes);
|
blockDataDestroy(pInfo->pDelRes);
|
||||||
tSimpleHashCleanup(pInfo->pStUpdated);
|
tSimpleHashCleanup(pInfo->pStUpdated);
|
||||||
tSimpleHashCleanup(pInfo->pStDeleted);
|
tSimpleHashCleanup(pInfo->pStDeleted);
|
||||||
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
|
|
||||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->historyWins);
|
taosArrayDestroy(pInfo->historyWins);
|
||||||
|
|
|
@ -46,6 +46,9 @@ void destroyStreamEventOperatorInfo(void* param) {
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
||||||
clearGroupResInfo(&pInfo->groupResInfo);
|
clearGroupResInfo(&pInfo->groupResInfo);
|
||||||
|
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
|
||||||
|
pInfo->pUpdated = NULL;
|
||||||
|
|
||||||
cleanupExprSupp(&pInfo->scalarSupp);
|
cleanupExprSupp(&pInfo->scalarSupp);
|
||||||
if (pInfo->pChildren != NULL) {
|
if (pInfo->pChildren != NULL) {
|
||||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||||
|
@ -60,7 +63,6 @@ void destroyStreamEventOperatorInfo(void* param) {
|
||||||
tSimpleHashCleanup(pInfo->pSeUpdated);
|
tSimpleHashCleanup(pInfo->pSeUpdated);
|
||||||
tSimpleHashCleanup(pInfo->pAllUpdated);
|
tSimpleHashCleanup(pInfo->pAllUpdated);
|
||||||
tSimpleHashCleanup(pInfo->pSeDeleted);
|
tSimpleHashCleanup(pInfo->pSeDeleted);
|
||||||
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
|
|
||||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->historyWins);
|
taosArrayDestroy(pInfo->historyWins);
|
||||||
|
|
|
@ -388,15 +388,20 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void destroyFlusedPos(void* pRes) {
|
||||||
|
SRowBuffPos* pPos = (SRowBuffPos*) pRes;
|
||||||
|
if (!pPos->needFree && !pPos->pRowBuff) {
|
||||||
|
taosMemoryFreeClear(pPos->pKey);
|
||||||
|
taosMemoryFree(pPos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void clearGroupResInfo(SGroupResInfo* pGroupResInfo) {
|
void clearGroupResInfo(SGroupResInfo* pGroupResInfo) {
|
||||||
if (pGroupResInfo->freeItem) {
|
if (pGroupResInfo->freeItem) {
|
||||||
int32_t size = taosArrayGetSize(pGroupResInfo->pRows);
|
int32_t size = taosArrayGetSize(pGroupResInfo->pRows);
|
||||||
for (int32_t i = pGroupResInfo->index; i < size; i++) {
|
for (int32_t i = pGroupResInfo->index; i < size; i++) {
|
||||||
SRowBuffPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
|
void* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
|
||||||
if (!pPos->needFree && !pPos->pRowBuff) {
|
destroyFlusedPos(pPos);
|
||||||
taosMemoryFreeClear(pPos->pKey);
|
|
||||||
taosMemoryFree(pPos);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
pGroupResInfo->freeItem = false;
|
pGroupResInfo->freeItem = false;
|
||||||
}
|
}
|
||||||
|
@ -409,6 +414,8 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
cleanupAggSup(&pInfo->aggSup);
|
cleanupAggSup(&pInfo->aggSup);
|
||||||
clearGroupResInfo(&pInfo->groupResInfo);
|
clearGroupResInfo(&pInfo->groupResInfo);
|
||||||
|
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
|
||||||
|
pInfo->pUpdated = NULL;
|
||||||
|
|
||||||
// it should be empty.
|
// it should be empty.
|
||||||
void* pIte = NULL;
|
void* pIte = NULL;
|
||||||
|
@ -437,7 +444,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
|
||||||
cleanupExprSupp(&pInfo->scalarSupp);
|
cleanupExprSupp(&pInfo->scalarSupp);
|
||||||
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
||||||
pInfo->pUpdatedMap = NULL;
|
pInfo->pUpdatedMap = NULL;
|
||||||
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
|
|
||||||
tSimpleHashCleanup(pInfo->pDeletedMap);
|
tSimpleHashCleanup(pInfo->pDeletedMap);
|
||||||
|
|
||||||
blockDataDestroy(pInfo->pCheckpointRes);
|
blockDataDestroy(pInfo->pCheckpointRes);
|
||||||
|
@ -1664,6 +1670,8 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
|
||||||
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
||||||
cleanupExprSupp(&pInfo->scalarSupp);
|
cleanupExprSupp(&pInfo->scalarSupp);
|
||||||
clearGroupResInfo(&pInfo->groupResInfo);
|
clearGroupResInfo(&pInfo->groupResInfo);
|
||||||
|
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
|
||||||
|
pInfo->pUpdated = NULL;
|
||||||
|
|
||||||
if (pInfo->pChildren != NULL) {
|
if (pInfo->pChildren != NULL) {
|
||||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||||
|
@ -1679,7 +1687,6 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
|
||||||
blockDataDestroy(pInfo->pWinBlock);
|
blockDataDestroy(pInfo->pWinBlock);
|
||||||
tSimpleHashCleanup(pInfo->pStUpdated);
|
tSimpleHashCleanup(pInfo->pStUpdated);
|
||||||
tSimpleHashCleanup(pInfo->pStDeleted);
|
tSimpleHashCleanup(pInfo->pStDeleted);
|
||||||
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
|
|
||||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->historyWins);
|
taosArrayDestroy(pInfo->historyWins);
|
||||||
|
@ -3250,6 +3257,9 @@ void destroyStreamStateOperatorInfo(void* param) {
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
||||||
clearGroupResInfo(&pInfo->groupResInfo);
|
clearGroupResInfo(&pInfo->groupResInfo);
|
||||||
|
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
|
||||||
|
pInfo->pUpdated = NULL;
|
||||||
|
|
||||||
cleanupExprSupp(&pInfo->scalarSupp);
|
cleanupExprSupp(&pInfo->scalarSupp);
|
||||||
if (pInfo->pChildren != NULL) {
|
if (pInfo->pChildren != NULL) {
|
||||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||||
|
@ -3263,7 +3273,6 @@ void destroyStreamStateOperatorInfo(void* param) {
|
||||||
blockDataDestroy(pInfo->pDelRes);
|
blockDataDestroy(pInfo->pDelRes);
|
||||||
tSimpleHashCleanup(pInfo->pSeUpdated);
|
tSimpleHashCleanup(pInfo->pSeUpdated);
|
||||||
tSimpleHashCleanup(pInfo->pSeDeleted);
|
tSimpleHashCleanup(pInfo->pSeDeleted);
|
||||||
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
|
|
||||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->historyWins);
|
taosArrayDestroy(pInfo->historyWins);
|
||||||
|
|
Loading…
Reference in New Issue