fix: tq mem leak
This commit is contained in:
parent
7b1833a6f8
commit
afda7c637b
|
@ -126,7 +126,9 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg);
|
||||
}
|
||||
|
||||
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); }
|
||||
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg);
|
||||
}
|
||||
|
||||
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
||||
SSingleWorkerCfg qCfg = {
|
||||
|
|
|
@ -72,11 +72,10 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
if (code != 0) {
|
||||
if (terrno != 0) code = terrno;
|
||||
vmSendRsp(pMsg, code);
|
||||
|
||||
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
|
@ -87,11 +86,10 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
if (code != 0) {
|
||||
if (terrno != 0) code = terrno;
|
||||
vmSendRsp(pMsg, code);
|
||||
|
||||
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
|
@ -149,7 +147,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
|||
|
||||
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
SRpcMsg * pMsg = NULL;
|
||||
SRpcMsg rsp;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
|
@ -190,7 +188,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
|||
|
||||
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
SRpcMsg * pMsg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, (void **)&pMsg);
|
||||
|
@ -216,7 +214,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
|
|||
|
||||
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
SRpcMsg * pMsg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, (void **)&pMsg);
|
||||
|
@ -235,7 +233,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
|||
}
|
||||
|
||||
static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
|
||||
SRpcMsg *pRpc = pMsg;
|
||||
SRpcMsg * pRpc = pMsg;
|
||||
SMsgHead *pHead = pRpc->pCont;
|
||||
int32_t code = 0;
|
||||
|
||||
|
|
|
@ -129,7 +129,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
||||
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
|
||||
if (msgType != TDMT_VND_SUBMIT) return 0;
|
||||
void* pIter = NULL;
|
||||
STqExec* pExec = NULL;
|
||||
|
@ -239,10 +239,9 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
|||
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
tEncodeSMqDataBlkRsp(&abuf, &rsp);
|
||||
pMsg->pCont = buf;
|
||||
pMsg->contLen = tlen;
|
||||
pMsg->code = 0;
|
||||
tmsgSendRsp(pMsg);
|
||||
|
||||
SRpcMsg resp = {.info = handleInfo, .pCont = buf, .contLen = tlen, .code = 0};
|
||||
tmsgSendRsp(&resp);
|
||||
|
||||
atomic_store_ptr(&pExec->pushHandle.handle, NULL);
|
||||
taosWUnLockLatch(&pExec->pushHandle.lock);
|
||||
|
@ -407,9 +406,9 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
|
|||
pTopic->buffer.output[j].status = 0;
|
||||
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||
SReadHandle handle = {
|
||||
.reader = pReadHandle,
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.pMsgCb = &pTq->pVnode->msgCb,
|
||||
.reader = pReadHandle,
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.pMsgCb = &pTq->pVnode->msgCb,
|
||||
};
|
||||
pTopic->buffer.output[j].pReadHandle = pReadHandle;
|
||||
pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
|
||||
|
@ -663,10 +662,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
tEncodeSMqDataBlkRsp(&abuf, &rsp);
|
||||
pMsg->pCont = buf;
|
||||
pMsg->contLen = tlen;
|
||||
pMsg->code = 0;
|
||||
tmsgSendRsp(pMsg);
|
||||
|
||||
SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
|
||||
tmsgSendRsp(&resp);
|
||||
|
||||
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
|
||||
TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset);
|
||||
|
@ -845,12 +843,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
/*rsp.pBlockData = pRes;*/
|
||||
|
||||
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
|
||||
pMsg->pCont = buf;
|
||||
pMsg->contLen = msgLen;
|
||||
pMsg->code = 0;
|
||||
SRpcMsg resp = {.info = pMsg->info, pCont = buf, .contLen = msgLen, .code = 0};
|
||||
tqDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
|
||||
pHead->msgType, consumerId, pReq->epoch);
|
||||
tmsgSendRsp(pMsg);
|
||||
tmsgSendRsp(&resp);
|
||||
taosMemoryFree(pHead);
|
||||
return 0;
|
||||
} else {
|
||||
|
@ -878,10 +874,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
tEncodeSMqPollRspV2(&abuf, &rspV2);
|
||||
pMsg->pCont = buf;
|
||||
pMsg->contLen = tlen;
|
||||
pMsg->code = 0;
|
||||
tmsgSendRsp(pMsg);
|
||||
|
||||
SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
|
||||
tmsgSendRsp(&resp);
|
||||
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
|
||||
pReq->epoch);
|
||||
/*}*/
|
||||
|
@ -990,10 +985,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
|
|||
for (int32_t i = 0; i < parallel; i++) {
|
||||
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||
SReadHandle handle = {
|
||||
.reader = pStreamReader,
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.pMsgCb = &pTq->pVnode->msgCb,
|
||||
.vnode = pTq->pVnode,
|
||||
.reader = pStreamReader,
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.pMsgCb = &pTq->pVnode->msgCb,
|
||||
.vnode = pTq->pVnode,
|
||||
};
|
||||
pTask->exec.runners[i].inputHandle = pStreamReader;
|
||||
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
|
|
|
@ -47,7 +47,7 @@ int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code) {
|
|||
SQueryTableRsp rsp = {.code = code};
|
||||
|
||||
int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
|
||||
void *msg = rpcMallocCont(contLen);
|
||||
void * msg = rpcMallocCont(contLen);
|
||||
tSerializeSQueryTableRsp(msg, contLen, &rsp);
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
|
@ -85,7 +85,7 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execIn
|
|||
SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo};
|
||||
|
||||
int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
|
||||
void *pRsp = rpcMallocCont(contLen);
|
||||
void * pRsp = rpcMallocCont(contLen);
|
||||
tSerializeSExplainRsp(pRsp, contLen, &rsp);
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
|
@ -104,7 +104,7 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execIn
|
|||
|
||||
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
|
||||
int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
|
||||
void *pRsp = rpcMallocCont(contLen);
|
||||
void * pRsp = rpcMallocCont(contLen);
|
||||
tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
|
@ -212,7 +212,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
|
|||
showRsp.tableMeta.numOfColumns = cols;
|
||||
|
||||
int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp);
|
||||
void *pBuf = rpcMallocCont(bufLen);
|
||||
void * pBuf = rpcMallocCont(bufLen);
|
||||
tSerializeSShowRsp(pBuf, bufLen, &showRsp);
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -341,7 +341,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t code = 0;
|
||||
SSubQueryMsg *msg = pMsg->pCont;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
|
||||
QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -361,7 +361,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
int64_t rId = msg->refId;
|
||||
|
||||
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
|
||||
char *sql = strndup(msg->msg, msg->sqlLen);
|
||||
char * sql = strndup(msg->msg, msg->sqlLen);
|
||||
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql);
|
||||
taosMemoryFreeClear(sql);
|
||||
|
||||
|
@ -378,8 +378,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
bool queryDone = false;
|
||||
SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
|
||||
bool needStop = false;
|
||||
SQWTaskCtx *handles = NULL;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SQWTaskCtx * handles = NULL;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -407,7 +407,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SResReadyReq *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -467,7 +467,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
SResFetchReq *msg = pMsg->pCont;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -505,7 +505,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
int32_t code = 0;
|
||||
STaskCancelReq *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
|
@ -542,7 +542,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t code = 0;
|
||||
STaskDropReq *msg = pMsg->pCont;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -581,7 +581,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t code = 0;
|
||||
SSchedulerHbReq req = {0};
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == pMsg->pCont) {
|
||||
QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
|
||||
|
|
|
@ -94,7 +94,9 @@ void rpcFreeCont(void* cont) {
|
|||
if (cont == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosMemoryFree((char*)cont - TRANS_MSG_OVERHEAD);
|
||||
tTrace("free mem: %p", (char*)cont - TRANS_MSG_OVERHEAD);
|
||||
}
|
||||
void* rpcReallocCont(void* ptr, int contLen) {
|
||||
if (ptr == NULL) {
|
||||
|
|
|
@ -133,6 +133,7 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
|
|||
} else {
|
||||
p->cap = p->total;
|
||||
p->buf = taosMemoryRealloc(p->buf, p->cap);
|
||||
tTrace("internal malloc mem: %p", p->buf);
|
||||
|
||||
uvBuf->base = p->buf + p->len;
|
||||
uvBuf->len = p->cap - p->len;
|
||||
|
|
|
@ -469,6 +469,8 @@ static void uvStartSendResp(SSrvMsg* smsg) {
|
|||
|
||||
if (pConn->broken == true) {
|
||||
// persist by
|
||||
transFreeMsg(smsg->msg.pCont);
|
||||
taosMemoryFree(smsg);
|
||||
transUnrefSrvHandle(pConn);
|
||||
return;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue