fix(stream):memory leak
This commit is contained in:
parent
13bee74a33
commit
f569355d06
|
@ -706,6 +706,9 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
|
|||
pFillSup->pAllColInfo = destroyFillColumnInfo(pFillSup->pAllColInfo, pFillSup->numOfFillCols, pFillSup->numOfAllCols);
|
||||
tSimpleHashCleanup(pFillSup->pResMap);
|
||||
pFillSup->pResMap = NULL;
|
||||
streamStateReleaseBuf(NULL, NULL, pFillSup->cur.pRowVal);
|
||||
pFillSup->cur.pRowVal = NULL;
|
||||
|
||||
taosMemoryFree(pFillSup);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -722,6 +725,7 @@ void* destroyStreamFillInfo(SStreamFillInfo* pFillInfo) {
|
|||
taosMemoryFreeClear(pFillInfo->pResRow);
|
||||
}
|
||||
pFillInfo->pLinearInfo = destroyStreamFillLinearInfo(pFillInfo->pLinearInfo);
|
||||
taosArrayDestroy(pFillInfo->delRanges);
|
||||
taosMemoryFree(pFillInfo);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -732,6 +736,8 @@ void destroyStreamFillOperatorInfo(void* param) {
|
|||
pInfo->pFillSup = destroyStreamFillSupporter(pInfo->pFillSup);
|
||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||
pInfo->pSrcBlock = blockDataDestroy(pInfo->pSrcBlock);
|
||||
pInfo->pPrevSrcBlock = blockDataDestroy(pInfo->pPrevSrcBlock);
|
||||
pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes);
|
||||
pInfo->pColMatchColInfo = taosArrayDestroy(pInfo->pColMatchColInfo);
|
||||
taosMemoryFree(pInfo);
|
||||
}
|
||||
|
@ -743,6 +749,7 @@ static void resetFillWindow(SResultRowData* pRowData) {
|
|||
|
||||
void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup, SStreamState* pState) {
|
||||
resetFillWindow(&pFillSup->prev);
|
||||
streamStateReleaseBuf(NULL, NULL, pFillSup->cur.pRowVal);
|
||||
resetFillWindow(&pFillSup->cur);
|
||||
resetFillWindow(&pFillSup->next);
|
||||
resetFillWindow(&pFillSup->nextNext);
|
||||
|
@ -753,12 +760,12 @@ void getCurWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupI
|
|||
resetPrevAndNextWindow(pFillSup, pState);
|
||||
|
||||
SWinKey key = {.ts = ts, .groupId = groupId};
|
||||
void* curVal = NULL;
|
||||
// void* curVal = NULL;
|
||||
int32_t curVLen = 0;
|
||||
int32_t code = streamStateFillGet(pState, &key, (void**)&curVal, &curVLen);
|
||||
int32_t code = streamStateFillGet(pState, &key, (void**)&pFillSup->cur.pRowVal, &curVLen);
|
||||
ASSERT(code == TSDB_CODE_SUCCESS);
|
||||
pFillSup->cur.key = key.ts;
|
||||
pFillSup->cur.pRowVal = curVal;
|
||||
// pFillSup->cur.pRowVal = curVal;
|
||||
}
|
||||
|
||||
void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) {
|
||||
|
@ -777,11 +784,9 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
|
|||
SWinKey preKey = {.groupId = groupId};
|
||||
void* preVal = NULL;
|
||||
int32_t preVLen = 0;
|
||||
if (pCur) {
|
||||
code = streamStateGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen);
|
||||
}
|
||||
code = streamStateGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen);
|
||||
|
||||
if (pCur && code == TSDB_CODE_SUCCESS) {
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pFillSup->prev.key = preKey.ts;
|
||||
pFillSup->prev.pRowVal = preVal;
|
||||
|
||||
|
@ -790,35 +795,36 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
|
|||
|
||||
code = streamStateCurNext(pState, pCur);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
streamStateFreeCur(pCur);
|
||||
pCur = NULL;
|
||||
}
|
||||
} else {
|
||||
streamStateFreeCur(pCur);
|
||||
pCur = streamStateFillSeekKeyNext(pState, &key);
|
||||
}
|
||||
|
||||
if (pCur) {
|
||||
SWinKey nextKey = {.groupId = groupId};
|
||||
void* nextVal = NULL;
|
||||
int32_t nextVLen = 0;
|
||||
code = streamStateGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextVLen);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pFillSup->next.key = nextKey.ts;
|
||||
pFillSup->next.pRowVal = nextVal;
|
||||
if (pFillSup->type == TSDB_FILL_PREV || pFillSup->type == TSDB_FILL_NEXT) {
|
||||
code = streamStateCurNext(pState, pCur);
|
||||
SWinKey nextKey = {.groupId = groupId};
|
||||
void* nextVal = NULL;
|
||||
int32_t nextVLen = 0;
|
||||
code = streamStateGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextVLen);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pFillSup->next.key = nextKey.ts;
|
||||
pFillSup->next.pRowVal = nextVal;
|
||||
if (pFillSup->type == TSDB_FILL_PREV || pFillSup->type == TSDB_FILL_NEXT) {
|
||||
code = streamStateCurNext(pState, pCur);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
SWinKey nextNextKey = {.groupId = groupId};
|
||||
void* nextNextVal = NULL;
|
||||
int32_t nextNextVLen = 0;
|
||||
code = streamStateGetGroupKVByCur(pCur, &nextNextKey, (const void**)&nextNextVal, &nextNextVLen);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
SWinKey nextNextKey = {.groupId = groupId};
|
||||
void* nextNextVal = NULL;
|
||||
int32_t nextNextVLen = 0;
|
||||
code = streamStateGetGroupKVByCur(pCur, &nextNextKey, (const void**)&nextNextVal, &nextNextVLen);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pFillSup->nextNext.key = nextNextKey.ts;
|
||||
pFillSup->nextNext.pRowVal = nextNextVal;
|
||||
}
|
||||
pFillSup->nextNext.key = nextNextKey.ts;
|
||||
pFillSup->nextNext.pRowVal = nextNextVal;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
streamStateFreeCur(pCur);
|
||||
}
|
||||
|
||||
static bool hasPrevWindow(SStreamFillSupporter* pFillSup) { return pFillSup->prev.key != INT64_MIN; }
|
||||
|
@ -1388,6 +1394,7 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) {
|
|||
}
|
||||
pInfo->srcDelRowIndex++;
|
||||
}
|
||||
streamStateFreeCur(pCur);
|
||||
doDeleteFillResultImpl(pOperator, ts, endTs, groupId);
|
||||
}
|
||||
pFillInfo->current = pFillInfo->end + 1;
|
||||
|
@ -1538,9 +1545,12 @@ static int32_t initResultBuf(SStreamFillSupporter* pFillSup) {
|
|||
pFillSup->next.key = INT64_MIN;
|
||||
pFillSup->nextNext.key = INT64_MIN;
|
||||
pFillSup->prev.key = INT64_MIN;
|
||||
pFillSup->cur.key = INT64_MIN;
|
||||
pFillSup->next.pRowVal = NULL;
|
||||
pFillSup->nextNext.pRowVal = NULL;
|
||||
pFillSup->prev.pRowVal = NULL;
|
||||
pFillSup->cur.pRowVal = NULL;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -879,7 +879,12 @@ static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) {
|
|||
int32_t size = taosArrayGetSize(pWins);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SWinKey* pW = taosArrayGet(pWins, i);
|
||||
taosHashRemove(pUpdatedMap, pW, sizeof(SWinKey));
|
||||
void* tmp = taosHashGet(pUpdatedMap, pW, sizeof(SWinKey));
|
||||
if (tmp) {
|
||||
void* value = *(void**)tmp;
|
||||
taosMemoryFree(value);
|
||||
taosHashRemove(pUpdatedMap, pW, sizeof(SWinKey));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1410,7 +1415,12 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int3
|
|||
taosArrayPush(pUpWins, &winRes);
|
||||
}
|
||||
if (pUpdatedMap) {
|
||||
taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
|
||||
void* tmp = taosHashGet(pUpdatedMap, &winRes, sizeof(SWinKey));
|
||||
if (tmp) {
|
||||
void* value = *(void**)tmp;
|
||||
taosMemoryFree(value);
|
||||
taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
|
||||
}
|
||||
}
|
||||
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
|
||||
} while (win.ekey <= endTsCols[i]);
|
||||
|
@ -2872,12 +2882,13 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SExprSupp* pSup, SAr
|
|||
pChildSup->rowEntryInfoOffset, &pChInfo->aggSup);
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true);
|
||||
compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
|
||||
releaseOutputBuf(pChInfo->pState, pWinRes, pChResult);
|
||||
}
|
||||
if (num > 0 && pUpdatedMap) {
|
||||
saveWinResultInfo(pCurResult->win.skey, pWinRes->groupId, pUpdatedMap);
|
||||
saveOutputBuf(pInfo->pState, pWinRes, pCurResult, pInfo->aggSup.resultRowSize);
|
||||
releaseOutputBuf(pInfo->pState, pWinRes, pCurResult);
|
||||
}
|
||||
releaseOutputBuf(pInfo->pState, pWinRes, pCurResult);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2891,9 +2902,7 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
|
|||
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) {
|
||||
if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
|
||||
SWinKey key = {.ts = pWin->skey, .groupId = groupId};
|
||||
void* pVal = NULL;
|
||||
int32_t size = 0;
|
||||
if (streamStateGet(pState, &key, &pVal, &size) == TSDB_CODE_SUCCESS) {
|
||||
if (streamStateGet(pState, &key, NULL, 0) == TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -196,6 +196,7 @@ int32_t streamStateClear(SStreamState* pState) {
|
|||
SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
|
||||
SWinKey delKey = {0};
|
||||
int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
|
||||
streamStateFreeCur(pCur);
|
||||
if (code == 0) {
|
||||
streamStateDel(pState, &delKey);
|
||||
} else {
|
||||
|
@ -225,6 +226,9 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void*
|
|||
|
||||
int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
|
||||
// todo refactor
|
||||
if (!pVal) {
|
||||
return 0;
|
||||
}
|
||||
streamFreeVal(pVal);
|
||||
return 0;
|
||||
}
|
||||
|
@ -236,7 +240,7 @@ SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
|
|||
|
||||
int32_t c;
|
||||
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
||||
tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
|
||||
tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c);
|
||||
if (c != 0) {
|
||||
taosMemoryFree(pCur);
|
||||
return NULL;
|
||||
|
@ -253,7 +257,7 @@ SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key)
|
|||
int32_t c;
|
||||
tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
|
||||
if (c != 0) {
|
||||
taosMemoryFree(pCur);
|
||||
streamStateFreeCur(pCur);
|
||||
return NULL;
|
||||
}
|
||||
return pCur;
|
||||
|
@ -266,6 +270,7 @@ SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
|
|||
if (code == 0) {
|
||||
return pCur;
|
||||
}
|
||||
streamStateFreeCur(pCur);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
@ -300,6 +305,9 @@ int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const vo
|
|||
}
|
||||
|
||||
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
|
||||
if (!pCur) {
|
||||
return -1;
|
||||
}
|
||||
uint64_t groupId = pKey->groupId;
|
||||
int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
|
||||
if (code == 0) {
|
||||
|
@ -360,7 +368,7 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
|
|||
|
||||
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
|
||||
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||
if (pCur == NULL) {
|
||||
if (!pCur) {
|
||||
return NULL;
|
||||
}
|
||||
if (tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL) < 0) {
|
||||
|
@ -411,6 +419,9 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey*
|
|||
}
|
||||
|
||||
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
|
||||
if (!pCur) {
|
||||
return -1;
|
||||
}
|
||||
//
|
||||
return tdbTbcMoveToNext(pCur->pCur);
|
||||
}
|
||||
|
|
|
@ -3,12 +3,15 @@ system sh/deploy.sh -n dnode1 -i 1
|
|||
system sh/deploy.sh -n dnode2 -i 2
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
#==system sh/exec.sh -n dnode1 -s start -v
|
||||
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
sql create dnode $hostname2 port 7200
|
||||
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
#==system sh/exec.sh -n dnode2 -s start -v
|
||||
|
||||
print ===== step1
|
||||
$x = 0
|
||||
|
@ -232,4 +235,41 @@ endi
|
|||
|
||||
print loop3 over
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#==system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
#==print =============== check
|
||||
#==$null=
|
||||
|
||||
#==system_content sh/checkValgrind.sh -n dnode1
|
||||
#==print cmd return result ----> [ $system_content ]
|
||||
#==if $system_content > 0 then
|
||||
#== return -1
|
||||
#==endi
|
||||
|
||||
#==if $system_content == $null then
|
||||
#== return -1
|
||||
#==endi
|
||||
|
||||
|
||||
|
||||
#==system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
#==print =============== check
|
||||
#==$null=
|
||||
|
||||
#==system_content sh/checkValgrind.sh -n dnode2
|
||||
#==print cmd return result ----> [ $system_content ]
|
||||
#==if $system_content > 0 then
|
||||
#== return -1
|
||||
#==endi
|
||||
|
||||
#==if $system_content == $null then
|
||||
#== return -1
|
||||
#==endi
|
||||
#==return 1
|
||||
|
||||
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
|
|
|
@ -4,6 +4,8 @@ looptest:
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
#==system sh/exec.sh -n dnode1 -s start -v
|
||||
|
||||
sleep 200
|
||||
sql connect
|
||||
|
||||
|
@ -353,6 +355,22 @@ endi
|
|||
|
||||
|
||||
|
||||
#==system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
#==print =============== check
|
||||
#==$null=
|
||||
|
||||
#==system_content sh/checkValgrind.sh -n dnode1
|
||||
#==print cmd return result ----> [ $system_content ]
|
||||
#==if $system_content > 0 then
|
||||
#== return -1
|
||||
#==endi
|
||||
|
||||
#==if $system_content == $null then
|
||||
#== return -1
|
||||
#==endi
|
||||
#==return 1
|
||||
|
||||
|
||||
|
||||
|
||||
sql drop stream if exists streams0;
|
||||
|
|
|
@ -4,6 +4,8 @@ looptest:
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
#==system sh/exec.sh -n dnode1 -s start -v
|
||||
|
||||
sleep 200
|
||||
sql connect
|
||||
|
||||
|
@ -671,6 +673,22 @@ endi
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
#==system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
#==print =============== check
|
||||
#==$null=
|
||||
|
||||
#==system_content sh/checkValgrind.sh -n dnode1
|
||||
#==print cmd return result ----> [ $system_content ]
|
||||
#==if $system_content > 0 then
|
||||
#== return -1
|
||||
#==endi
|
||||
|
||||
#==if $system_content == $null then
|
||||
#== return -1
|
||||
#==endi
|
||||
#==return 1
|
||||
|
||||
|
||||
sql drop stream if exists streams0;
|
||||
|
|
|
@ -4,6 +4,8 @@ looptest:
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
#==system sh/exec.sh -n dnode1 -s start -v
|
||||
|
||||
sleep 200
|
||||
sql connect
|
||||
|
||||
|
@ -1011,6 +1013,21 @@ endi
|
|||
|
||||
|
||||
|
||||
|
||||
#==system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
#==print =============== check
|
||||
#==$null=
|
||||
|
||||
#==system_content sh/checkValgrind.sh -n dnode1
|
||||
#==print cmd return result ----> [ $system_content ]
|
||||
#==if $system_content > 0 then
|
||||
#== return -1
|
||||
#==endi
|
||||
|
||||
#==if $system_content == $null then
|
||||
#== return -1
|
||||
#==endi
|
||||
#==return 1
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ looptest:
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
#==system sh/exec.sh -n dnode1 -s start -v
|
||||
sleep 200
|
||||
sql connect
|
||||
|
||||
|
@ -463,6 +464,20 @@ endi
|
|||
|
||||
|
||||
|
||||
#==system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
#==print =============== check
|
||||
#==$null=
|
||||
|
||||
#==system_content sh/checkValgrind.sh -n dnode1
|
||||
#==print cmd return result ----> [ $system_content ]
|
||||
#==if $system_content > 0 then
|
||||
#== return -1
|
||||
#==endi
|
||||
|
||||
#==if $system_content == $null then
|
||||
#== return -1
|
||||
#==endi
|
||||
#==return 1
|
||||
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue