From cf571e4f1f2e5afa9099b3c0712caa2f071e6e44 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Feb 2024 09:52:22 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 6 ++++-- source/dnode/vnode/src/tqCommon/tqCommon.c | 8 ++++---- source/libs/stream/inc/streamInt.h | 2 -- source/libs/stream/src/streamDispatch.c | 4 +++- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d6d89d1d30..b11db04e82 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -745,10 +745,12 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq); int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); - -int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); +int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); +int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); +void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq); + typedef struct SStreamTaskCheckpointReq { int64_t streamId; int32_t taskId; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 9a22bc303b..c9dd20ef97 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -317,14 +317,14 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { if (pTask == NULL) { tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, req.dstTaskId); - taosMemoryFree(req.pRetrieve); + tDeleteStreamRetrieveReq(&req); return -1; } int32_t code = 0; - if(pTask->info.taskLevel == TASK_LEVEL__SOURCE){ + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { code = streamProcessRetrieveReq(pTask, &req); - }else{ + } else { req.srcNodeId = pTask->info.nodeId; req.srcTaskId = pTask->id.taskId; code = broadcastRetrieveMsg(pTask, &req); @@ -334,7 +334,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { sendRetrieveRsp(&req, &rsp); streamMetaReleaseTask(pMeta, pTask); - taosMemoryFree(req.pRetrieve); + tDeleteStreamRetrieveReq(&req); return code; } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 300b0a7f24..87f63b48ed 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -109,8 +109,6 @@ void destroyStreamDataBlock(SStreamDataBlock* pBlock); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock); -int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); - int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId); int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 98d9a29c87..59f110769c 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -162,6 +162,8 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) { return 0; } +void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); } + void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){ void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp)); ((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId); @@ -174,7 +176,7 @@ void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){ tmsgSendRsp(pRsp); } -int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req){ +int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq* req) { int32_t code = 0; void* buf = NULL; int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList);