diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index f0b54a82e7..c7a23a2482 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -182,7 +182,7 @@ typedef struct { #define TRANS_RESERVE_SIZE (sizeof(STranConnCtx)) -#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead) + sizeof(STransDigestMsg)) +#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead)) #define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead))) #define transContFromHead(msg) (msg + sizeof(STransMsgHead)) #define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead)) @@ -201,6 +201,7 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); void transConnCtxDestroy(STransConnCtx* ctx); +void transFreeMsg(void* msg); typedef struct SConnBuffer { char* buf; int len; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 67afd46b1c..4d384814ec 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -107,8 +107,17 @@ static void clientHandleResp(SCliConn* conn) { SRpcInfo* pRpc = pCtx->pTransInst; SRpcMsg rpcMsg; - rpcMsg.pCont = conn->readBuf.buf; - rpcMsg.contLen = conn->readBuf.len; + STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); + pHead->code = htonl(pHead->code); + pHead->msgLen = htonl(pHead->msgLen); + + rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); + rpcMsg.pCont = transContFromHead(pHead); + rpcMsg.code = pHead->code; + rpcMsg.msgType = pHead->msgType; + + // rpcMsg.pCont = conn->readBuf.buf; + // rpcMsg.contLen = conn->readBuf.len; rpcMsg.ahandle = pCtx->ahandle; (pRpc->cfp)(NULL, &rpcMsg, NULL); conn->notifyCount += 1; @@ -128,8 +137,12 @@ static void clientHandleExcept(SCliConn* pConn) { STransConnCtx* pCtx = pMsg->ctx; SRpcInfo* pRpc = pCtx->pTransInst; - SRpcMsg rpcMsg; + transFreeMsg((pMsg->msg.pCont)); + pMsg->msg.pCont = NULL; + + SRpcMsg rpcMsg = {0}; rpcMsg.ahandle = pCtx->ahandle; + rpcMsg.code = -1; // SRpcInfo* pRpc = pMsg->ctx->pRpc; (pRpc->cfp)(NULL, &rpcMsg, NULL); @@ -306,6 +319,10 @@ static void clientDestroy(uv_handle_t* handle) { static void clientWriteCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; + SCliMsg* pMsg = pConn->data; + transFreeMsg((pMsg->msg.pCont)); + pMsg->msg.pCont = NULL; + if (status == 0) { tDebug("conn %p data already was written out", pConn); } else { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 617abeea39..5bece11bec 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -191,4 +191,11 @@ void transConnCtxDestroy(STransConnCtx* ctx) { free(ctx->ip); free(ctx); } + +void transFreeMsg(void* msg) { + if (msg == NULL) { + return; + } + free((char*)msg - sizeof(STransMsgHead)); +} #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 34e621f0f3..c70b1a5b28 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -321,6 +321,10 @@ void uvOnWriteCb(uv_write_t* req, int status) { buf->len = 0; memset(buf->buf, 0, buf->cap); buf->left = -1; + + SRpcMsg* pMsg = &conn->sendMsg; + transFreeMsg(pMsg->pCont); + if (status == 0) { tDebug("conn %p data already was written on stream", conn); } else {