fix:memory leak
This commit is contained in:
parent
e7593b4bcf
commit
3241f9bda5
|
@ -62,10 +62,14 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
|
SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
|
||||||
streamProcessDispatchMsg(pTask, &req, &rsp);
|
streamProcessDispatchMsg(pTask, &req, &rsp);
|
||||||
|
tDeleteStreamDispatchReq(&req);
|
||||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
return;
|
return;
|
||||||
|
} else {
|
||||||
|
tDeleteStreamDispatchReq(&req);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
FAIL:
|
FAIL:
|
||||||
|
@ -450,14 +454,17 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) {
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
|
tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
|
||||||
tDecodeStreamDispatchReq(&decoder, &req);
|
tDecodeStreamDispatchReq(&decoder, &req);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId);
|
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId);
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||||
streamProcessDispatchMsg(pTask, &req, &rsp);
|
streamProcessDispatchMsg(pTask, &req, &rsp);
|
||||||
|
tDeleteStreamDispatchReq(&req);
|
||||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
tDeleteStreamDispatchReq(&req);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1362,6 +1362,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||||
streamProcessDispatchMsg(pTask, &req, &rsp);
|
streamProcessDispatchMsg(pTask, &req, &rsp);
|
||||||
|
tDeleteStreamDispatchReq(&req);
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1600,6 +1601,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||||
streamProcessDispatchMsg(pTask, &req, &rsp);
|
streamProcessDispatchMsg(pTask, &req, &rsp);
|
||||||
|
tDeleteStreamDispatchReq(&req);
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
|
|
|
@ -283,7 +283,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
tmsgSendRsp(pRsp);
|
tmsgSendRsp(pRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
tDeleteStreamDispatchReq(pReq);
|
|
||||||
streamSchedExec(pTask);
|
streamSchedExec(pTask);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -519,6 +519,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) {
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
stError("failed to open rocksdb, path:%s, reason:%s", backendPath, err);
|
stError("failed to open rocksdb, path:%s, reason:%s", backendPath, err);
|
||||||
taosMemoryFreeClear(err);
|
taosMemoryFreeClear(err);
|
||||||
|
rocksdb_list_column_families_destroy(cfs, nCf);
|
||||||
goto _EXIT;
|
goto _EXIT;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -20,9 +20,9 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/window_close_session_ext.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/window_close_session_ext.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/partition_interval.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/partition_interval.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/pause_resume_test.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/pause_resume_test.py
|
||||||
#,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/vnode_restart.py -N 4
|
#,,n,system-test,python3 ./test.py -f 8-stream/vnode_restart.py -N 4
|
||||||
#,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/snode_restart.py -N 4
|
#,,n,system-test,python3 ./test.py -f 8-stream/snode_restart.py -N 4
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4
|
,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4
|
||||||
|
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py
|
||||||
|
|
Loading…
Reference in New Issue