fix: tq mem leak
This commit is contained in:
parent
afda7c637b
commit
4a786f1ad0
|
@ -162,7 +162,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
vTrace("message in fetch queue is processing");
|
vTrace("message in fetch queue is processing");
|
||||||
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char * msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_VND_FETCH:
|
case TDMT_VND_FETCH:
|
||||||
|
@ -184,8 +184,12 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
case TDMT_VND_TASK_PIPE_EXEC:
|
case TDMT_VND_TASK_PIPE_EXEC:
|
||||||
case TDMT_VND_TASK_MERGE_EXEC:
|
case TDMT_VND_TASK_MERGE_EXEC:
|
||||||
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
|
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
|
||||||
case TDMT_VND_STREAM_TRIGGER:
|
case TDMT_VND_STREAM_TRIGGER: {
|
||||||
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
|
// refactor, avoid double free
|
||||||
|
int code = tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
|
||||||
|
pMsg->pCont = NULL;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
case TDMT_VND_QUERY_HEARTBEAT:
|
case TDMT_VND_QUERY_HEARTBEAT:
|
||||||
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
|
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
|
||||||
default:
|
default:
|
||||||
|
@ -328,12 +332,12 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
int rcode = 0;
|
int rcode = 0;
|
||||||
SVCreateTbBatchReq req = {0};
|
SVCreateTbBatchReq req = {0};
|
||||||
SVCreateTbReq *pCreateReq;
|
SVCreateTbReq * pCreateReq;
|
||||||
SVCreateTbBatchRsp rsp = {0};
|
SVCreateTbBatchRsp rsp = {0};
|
||||||
SVCreateTbRsp cRsp = {0};
|
SVCreateTbRsp cRsp = {0};
|
||||||
char tbName[TSDB_TABLE_FNAME_LEN];
|
char tbName[TSDB_TABLE_FNAME_LEN];
|
||||||
STbUidStore *pStore = NULL;
|
STbUidStore * pStore = NULL;
|
||||||
SArray *tbUids = NULL;
|
SArray * tbUids = NULL;
|
||||||
|
|
||||||
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
|
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
|
||||||
pRsp->code = TSDB_CODE_SUCCESS;
|
pRsp->code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -517,7 +521,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
int ret;
|
int ret;
|
||||||
SArray *tbUids = NULL;
|
SArray * tbUids = NULL;
|
||||||
|
|
||||||
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
|
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
|
||||||
pRsp->pCont = NULL;
|
pRsp->pCont = NULL;
|
||||||
|
@ -572,9 +576,9 @@ _exit:
|
||||||
|
|
||||||
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) {
|
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) {
|
||||||
SSubmitBlkIter blkIter = {0};
|
SSubmitBlkIter blkIter = {0};
|
||||||
STSchema *pSchema = NULL;
|
STSchema * pSchema = NULL;
|
||||||
tb_uid_t suid = 0;
|
tb_uid_t suid = 0;
|
||||||
STSRow *row = NULL;
|
STSRow * row = NULL;
|
||||||
|
|
||||||
tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
|
tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
|
||||||
if (blkIter.row == NULL) return 0;
|
if (blkIter.row == NULL) return 0;
|
||||||
|
@ -605,8 +609,8 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub
|
||||||
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
|
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
|
||||||
ASSERT(pMsg != NULL);
|
ASSERT(pMsg != NULL);
|
||||||
SSubmitMsgIter msgIter = {0};
|
SSubmitMsgIter msgIter = {0};
|
||||||
SMeta *pMeta = pVnode->pMeta;
|
SMeta * pMeta = pVnode->pMeta;
|
||||||
SSubmitBlk *pBlock = NULL;
|
SSubmitBlk * pBlock = NULL;
|
||||||
|
|
||||||
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -620,10 +624,10 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char
|
||||||
}
|
}
|
||||||
|
|
||||||
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
|
SSubmitReq * pSubmitReq = (SSubmitReq *)pReq;
|
||||||
SSubmitRsp submitRsp = {0};
|
SSubmitRsp submitRsp = {0};
|
||||||
SSubmitMsgIter msgIter = {0};
|
SSubmitMsgIter msgIter = {0};
|
||||||
SSubmitBlk *pBlock;
|
SSubmitBlk * pBlock;
|
||||||
SSubmitRsp rsp = {0};
|
SSubmitRsp rsp = {0};
|
||||||
SVCreateTbReq createTbReq = {0};
|
SVCreateTbReq createTbReq = {0};
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
|
|
Loading…
Reference in New Issue