diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index cf76d3a167..c4314a57b1 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -126,7 +126,9 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg); } -int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); } +int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); +} int32_t mmStartWorker(SMnodeMgmt *pMgmt) { SSingleWorkerCfg qCfg = { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index eec6bb3fb4..6632480aae 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -72,11 +72,10 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { if (code != 0) { if (terrno != 0) code = terrno; vmSendRsp(pMsg, code); - - dTrace("msg:%p, is freed, code:0x%x", pMsg, code); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); } + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); } static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { @@ -87,11 +86,10 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { if (code != 0) { if (terrno != 0) code = terrno; vmSendRsp(pMsg, code); - - dTrace("msg:%p, is freed, code:0x%x", pMsg, code); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); } + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); } static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { @@ -149,7 +147,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SRpcMsg *pMsg = NULL; + SRpcMsg * pMsg = NULL; SRpcMsg rsp; for (int32_t i = 0; i < numOfMsgs; ++i) { @@ -190,7 +188,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SRpcMsg *pMsg = NULL; + SRpcMsg * pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); @@ -216,7 +214,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SRpcMsg *pMsg = NULL; + SRpcMsg * pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); @@ -235,7 +233,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO } static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) { - SRpcMsg *pRpc = pMsg; + SRpcMsg * pRpc = pMsg; SMsgHead *pHead = pRpc->pCont; int32_t code = 0; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9361c0e6d2..4a3b47c0d1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -129,7 +129,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { return 0; } -int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { +int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) { if (msgType != TDMT_VND_SUBMIT) return 0; void* pIter = NULL; STqExec* pExec = NULL; @@ -239,10 +239,9 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqDataBlkRsp(&abuf, &rsp); - pMsg->pCont = buf; - pMsg->contLen = tlen; - pMsg->code = 0; - tmsgSendRsp(pMsg); + + SRpcMsg resp = {.info = handleInfo, .pCont = buf, .contLen = tlen, .code = 0}; + tmsgSendRsp(&resp); atomic_store_ptr(&pExec->pushHandle.handle, NULL); taosWUnLockLatch(&pExec->pushHandle.lock); @@ -407,9 +406,9 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu pTopic->buffer.output[j].status = 0; STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); SReadHandle handle = { - .reader = pReadHandle, - .meta = pTq->pVnode->pMeta, - .pMsgCb = &pTq->pVnode->msgCb, + .reader = pReadHandle, + .meta = pTq->pVnode->pMeta, + .pMsgCb = &pTq->pVnode->msgCb, }; pTopic->buffer.output[j].pReadHandle = pReadHandle; pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); @@ -663,10 +662,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqDataBlkRsp(&abuf, &rsp); - pMsg->pCont = buf; - pMsg->contLen = tlen; - pMsg->code = 0; - tmsgSendRsp(pMsg); + + SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0}; + tmsgSendRsp(&resp); tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld", TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset); @@ -845,12 +843,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { /*rsp.pBlockData = pRes;*/ /*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/ - pMsg->pCont = buf; - pMsg->contLen = msgLen; - pMsg->code = 0; + SRpcMsg resp = {.info = pMsg->info, pCont = buf, .contLen = msgLen, .code = 0}; tqDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset, pHead->msgType, consumerId, pReq->epoch); - tmsgSendRsp(pMsg); + tmsgSendRsp(&resp); taosMemoryFree(pHead); return 0; } else { @@ -878,10 +874,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqPollRspV2(&abuf, &rspV2); - pMsg->pCont = buf; - pMsg->contLen = tlen; - pMsg->code = 0; - tmsgSendRsp(pMsg); + + SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0}; + tmsgSendRsp(&resp); tqDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch); /*}*/ @@ -990,10 +985,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { for (int32_t i = 0; i < parallel; i++) { STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); SReadHandle handle = { - .reader = pStreamReader, - .meta = pTq->pVnode->pMeta, - .pMsgCb = &pTq->pVnode->msgCb, - .vnode = pTq->pVnode, + .reader = pStreamReader, + .meta = pTq->pVnode->pMeta, + .pMsgCb = &pTq->pVnode->msgCb, + .vnode = pTq->pVnode, }; pTask->exec.runners[i].inputHandle = pStreamReader; pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 60270d3e06..562e550bdc 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -47,7 +47,7 @@ int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code) { SQueryTableRsp rsp = {.code = code}; int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); - void *msg = rpcMallocCont(contLen); + void * msg = rpcMallocCont(contLen); tSerializeSQueryTableRsp(msg, contLen, &rsp); SRpcMsg rpcRsp = { @@ -85,7 +85,7 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execIn SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo}; int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp); - void *pRsp = rpcMallocCont(contLen); + void * pRsp = rpcMallocCont(contLen); tSerializeSExplainRsp(pRsp, contLen, &rsp); SRpcMsg rpcRsp = { @@ -104,7 +104,7 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execIn int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); - void *pRsp = rpcMallocCont(contLen); + void * pRsp = rpcMallocCont(contLen); tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); SRpcMsg rpcRsp = { @@ -212,7 +212,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { showRsp.tableMeta.numOfColumns = cols; int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp); - void *pBuf = rpcMallocCont(bufLen); + void * pBuf = rpcMallocCont(bufLen); tSerializeSShowRsp(pBuf, bufLen, &showRsp); SRpcMsg rpcMsg = { @@ -341,7 +341,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; SSubQueryMsg *msg = pMsg->pCont; - SQWorker *mgmt = (SQWorker *)qWorkerMgmt; + SQWorker * mgmt = (SQWorker *)qWorkerMgmt; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -361,7 +361,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int64_t rId = msg->refId; SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info}; - char *sql = strndup(msg->msg, msg->sqlLen); + char * sql = strndup(msg->msg, msg->sqlLen); QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql); taosMemoryFreeClear(sql); @@ -378,8 +378,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { bool queryDone = false; SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont; bool needStop = false; - SQWTaskCtx *handles = NULL; - SQWorker *mgmt = (SQWorker *)qWorkerMgmt; + SQWTaskCtx * handles = NULL; + SQWorker * mgmt = (SQWorker *)qWorkerMgmt; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -407,7 +407,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } - SQWorker *mgmt = (SQWorker *)qWorkerMgmt; + SQWorker * mgmt = (SQWorker *)qWorkerMgmt; SResReadyReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -467,7 +467,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { } SResFetchReq *msg = pMsg->pCont; - SQWorker *mgmt = (SQWorker *)qWorkerMgmt; + SQWorker * mgmt = (SQWorker *)qWorkerMgmt; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -505,7 +505,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } - SQWorker *mgmt = (SQWorker *)qWorkerMgmt; + SQWorker * mgmt = (SQWorker *)qWorkerMgmt; int32_t code = 0; STaskCancelReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { @@ -542,7 +542,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; STaskDropReq *msg = pMsg->pCont; - SQWorker *mgmt = (SQWorker *)qWorkerMgmt; + SQWorker * mgmt = (SQWorker *)qWorkerMgmt; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -581,7 +581,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; SSchedulerHbReq req = {0}; - SQWorker *mgmt = (SQWorker *)qWorkerMgmt; + SQWorker * mgmt = (SQWorker *)qWorkerMgmt; if (NULL == pMsg->pCont) { QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 5c01034f35..5627dbfbf5 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -94,7 +94,9 @@ void rpcFreeCont(void* cont) { if (cont == NULL) { return; } + taosMemoryFree((char*)cont - TRANS_MSG_OVERHEAD); + tTrace("free mem: %p", (char*)cont - TRANS_MSG_OVERHEAD); } void* rpcReallocCont(void* ptr, int contLen) { if (ptr == NULL) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 98e9e67ede..d5c76ccbf2 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -133,6 +133,7 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { } else { p->cap = p->total; p->buf = taosMemoryRealloc(p->buf, p->cap); + tTrace("internal malloc mem: %p", p->buf); uvBuf->base = p->buf + p->len; uvBuf->len = p->cap - p->len; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 84adeb4bf6..da83a6f37f 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -469,6 +469,8 @@ static void uvStartSendResp(SSrvMsg* smsg) { if (pConn->broken == true) { // persist by + transFreeMsg(smsg->msg.pCont); + taosMemoryFree(smsg); transUnrefSrvHandle(pConn); return; }