refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-02-19 09:52:22 +08:00
parent 8a4b79bf2c
commit cf571e4f1f
4 changed files with 11 additions and 9 deletions

View File

@ -745,10 +745,12 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq);
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
typedef struct SStreamTaskCheckpointReq {
int64_t streamId;
int32_t taskId;

View File

@ -317,14 +317,14 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
if (pTask == NULL) {
tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
req.dstTaskId);
taosMemoryFree(req.pRetrieve);
tDeleteStreamRetrieveReq(&req);
return -1;
}
int32_t code = 0;
if(pTask->info.taskLevel == TASK_LEVEL__SOURCE){
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamProcessRetrieveReq(pTask, &req);
}else{
} else {
req.srcNodeId = pTask->info.nodeId;
req.srcTaskId = pTask->id.taskId;
code = broadcastRetrieveMsg(pTask, &req);
@ -334,7 +334,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
sendRetrieveRsp(&req, &rsp);
streamMetaReleaseTask(pMeta, pTask);
taosMemoryFree(req.pRetrieve);
tDeleteStreamRetrieveReq(&req);
return code;
}

View File

@ -109,8 +109,6 @@ void destroyStreamDataBlock(SStreamDataBlock* pBlock);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock);
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId);
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);

View File

@ -162,6 +162,8 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
return 0;
}
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
@ -174,7 +176,7 @@ void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){
tmsgSendRsp(pRsp);
}
int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req){
int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq* req) {
int32_t code = 0;
void* buf = NULL;
int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList);