fix mem leak
This commit is contained in:
parent
44cd0d86ab
commit
ca3aee591f
|
@ -449,7 +449,7 @@ void destroyFlusedPos(void* pRes) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyFlusedppPos(void* ppRes) {
|
void destroyFlusedppPos(void* ppRes) {
|
||||||
void *pRes = *(void **)ppRes;
|
void* pRes = *(void**)ppRes;
|
||||||
destroyFlusedPos(pRes);
|
destroyFlusedPos(pRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -507,7 +507,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pInfo->pMidPullDatas);
|
taosArrayDestroy(pInfo->pMidPullDatas);
|
||||||
|
|
||||||
if (pInfo->pState !=NULL && pInfo->pState->dump == 1) {
|
if (pInfo->pState != NULL && pInfo->pState->dump == 1) {
|
||||||
taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner);
|
taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner);
|
||||||
taosMemoryFreeClear(pInfo->pState->pTdbState);
|
taosMemoryFreeClear(pInfo->pState->pTdbState);
|
||||||
}
|
}
|
||||||
|
@ -548,7 +548,8 @@ void reloadFromDownStream(SOperatorInfo* downstream, SStreamIntervalOperatorInfo
|
||||||
|
|
||||||
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; }
|
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; }
|
||||||
|
|
||||||
int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo, struct SSteamOpBasicInfo* pBasic) {
|
int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo,
|
||||||
|
struct SSteamOpBasicInfo* pBasic) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore;
|
SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore;
|
||||||
|
@ -1897,9 +1898,8 @@ _end:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
|
||||||
SExecTaskInfo* pTaskInfo, int32_t numOfChild,
|
int32_t numOfChild, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
|
||||||
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
|
|
||||||
QRY_PARAM_CHECK(pOptrInfo);
|
QRY_PARAM_CHECK(pOptrInfo);
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -2042,10 +2042,12 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
|
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
|
||||||
pOperator->fpSet = createOperatorFpSet(NULL, doStreamMidIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo,
|
pOperator->fpSet =
|
||||||
|
createOperatorFpSet(NULL, doStreamMidIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
} else {
|
} else {
|
||||||
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo,
|
pOperator->fpSet =
|
||||||
|
createOperatorFpSet(NULL, doStreamFinalIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
}
|
}
|
||||||
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
|
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
|
||||||
|
@ -2221,9 +2223,9 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
||||||
pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex);
|
pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex);
|
||||||
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
|
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
|
||||||
pSup->pState->pFileState = NULL;
|
pSup->pState->pFileState = NULL;
|
||||||
code = pSup->stateStore.streamFileStateInit(
|
code = pSup->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize,
|
||||||
tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState,
|
sesionTs, pSup->pState, pTwAggSup->deleteMark, taskIdStr,
|
||||||
pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT, &pSup->pState->pFileState);
|
pHandle->checkpointId, STREAM_STATE_BUFF_SORT, &pSup->pState->pFileState);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
@ -3309,8 +3311,8 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe
|
||||||
QUERY_CHECK_CONDITION((winCode == TSDB_CODE_SUCCESS), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
QUERY_CHECK_CONDITION((winCode == TSDB_CODE_SUCCESS), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
|
|
||||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
code =
|
code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo,
|
||||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
sizeof(SResultWindowInfo));
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3772,8 +3774,8 @@ _end:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
|
||||||
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
|
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
|
||||||
QRY_PARAM_CHECK(pOptrInfo);
|
QRY_PARAM_CHECK(pOptrInfo);
|
||||||
|
|
||||||
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
||||||
|
@ -3886,7 +3888,8 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAggNext, NULL, destroyStreamSessionAggOperatorInfo,
|
pOperator->fpSet =
|
||||||
|
createOperatorFpSet(optrDummyOpenFn, doStreamSessionAggNext, NULL, destroyStreamSessionAggOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
|
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
|
||||||
|
|
||||||
|
@ -4102,8 +4105,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||||
SExecTaskInfo* pTaskInfo, int32_t numOfChild,
|
SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle,
|
||||||
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
|
SOperatorInfo** pOptrInfo) {
|
||||||
QRY_PARAM_CHECK(pOptrInfo);
|
QRY_PARAM_CHECK(pOptrInfo);
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -4618,8 +4621,8 @@ int32_t doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
code =
|
code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo,
|
||||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
sizeof(SResultWindowInfo));
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4846,7 +4849,7 @@ void streamStateReleaseState(SOperatorInfo* pOperator) {
|
||||||
int32_t resSize = winSize + sizeof(TSKEY);
|
int32_t resSize = winSize + sizeof(TSKEY);
|
||||||
char* pBuff = taosMemoryCalloc(1, resSize);
|
char* pBuff = taosMemoryCalloc(1, resSize);
|
||||||
if (!pBuff) {
|
if (!pBuff) {
|
||||||
return ;
|
return;
|
||||||
}
|
}
|
||||||
memcpy(pBuff, pInfo->historyWins->pData, winSize);
|
memcpy(pBuff, pInfo->historyWins->pData, winSize);
|
||||||
memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
|
memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
|
||||||
|
@ -4944,7 +4947,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosMemoryFree(pBuf);
|
taosMemoryFreeClear(pBuf);
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
if (downstream->fpSet.reloadStreamStateFn) {
|
if (downstream->fpSet.reloadStreamStateFn) {
|
||||||
|
@ -4953,6 +4956,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
|
||||||
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
|
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
|
taosMemoryFreeClear(pBuf);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||||
}
|
}
|
||||||
|
@ -5383,7 +5387,8 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
||||||
pInfo->pState->pFileState = NULL;
|
pInfo->pState->pFileState = NULL;
|
||||||
code = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
|
code = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
|
||||||
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
|
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
|
||||||
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH, &pInfo->pState->pFileState);
|
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH,
|
||||||
|
&pInfo->pState->pFileState);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
pInfo->pOperator = pOperator;
|
pInfo->pOperator = pOperator;
|
||||||
|
|
Loading…
Reference in New Issue