fix mem leak
This commit is contained in:
parent
d33bcc9977
commit
617d8d4013
|
@ -912,10 +912,12 @@ void tmqFreeImpl(void* handle) {
|
||||||
tmq_t* tmq = (tmq_t*)handle;
|
tmq_t* tmq = (tmq_t*)handle;
|
||||||
|
|
||||||
// TODO stop timer
|
// TODO stop timer
|
||||||
|
if (tmq->mqueue) {
|
||||||
tmqClearUnhandleMsg(tmq);
|
tmqClearUnhandleMsg(tmq);
|
||||||
if (tmq->mqueue) taosCloseQueue(tmq->mqueue);
|
taosCloseQueue(tmq->mqueue);
|
||||||
|
}
|
||||||
if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
|
if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
|
||||||
if (tmq->qall) taosFreeQall(tmq->qall);
|
taosFreeQall(tmq->qall);
|
||||||
|
|
||||||
tsem_destroy(&tmq->rspSem);
|
tsem_destroy(&tmq->rspSem);
|
||||||
|
|
||||||
|
|
|
@ -58,11 +58,7 @@ static void smProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
dTrace("msg:%p, get from snode-stream queue", pMsg);
|
dTrace("msg:%p, get from snode-stream queue", pMsg);
|
||||||
int32_t code = sndProcessStreamMsg(pMgmt->pSnode, pMsg);
|
int32_t code = sndProcessStreamMsg(pMgmt->pSnode, pMsg);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
if (pMsg) {
|
|
||||||
dGError("snd, msg:%p failed to process stream msg %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr(code));
|
dGError("snd, msg:%p failed to process stream msg %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr(code));
|
||||||
} else {
|
|
||||||
dGError("snd, msg:%p failed to process stream empty msg since %s", pMsg, terrstr(code));
|
|
||||||
}
|
|
||||||
smSendRsp(pMsg, terrno);
|
smSendRsp(pMsg, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,12 +86,8 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
|
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
if (terrno != 0) code = terrno;
|
if (terrno != 0) code = terrno;
|
||||||
if (pMsg) {
|
|
||||||
dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
|
dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
|
||||||
terrstr(code));
|
terrstr(code));
|
||||||
} else {
|
|
||||||
dGError("vgId:%d, msg:%p failed to process stream empty msg since %s", pVnode->vgId, pMsg, terrstr(code));
|
|
||||||
}
|
|
||||||
vmSendRsp(pMsg, code);
|
vmSendRsp(pMsg, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,8 +150,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg, terrstr(),
|
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg,
|
||||||
TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
|
terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
|
||||||
return terrno != 0 ? terrno : -1;
|
return terrno != 0 ? terrno : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -284,8 +284,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
|
|
||||||
handle.pRef = walOpenRef(pTq->pVnode->pWal);
|
handle.pRef = walOpenRef(pTq->pVnode->pWal);
|
||||||
if (handle.pRef == NULL) {
|
if (handle.pRef == NULL) {
|
||||||
ASSERT(0);
|
continue;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
walRefVer(handle.pRef, handle.snapshotVer);
|
walRefVer(handle.pRef, handle.snapshotVer);
|
||||||
|
|
||||||
|
|
|
@ -46,20 +46,25 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
|
||||||
}
|
}
|
||||||
int32_t size = htonl(head.size);
|
int32_t size = htonl(head.size);
|
||||||
void* memBuf = taosMemoryCalloc(1, size);
|
void* memBuf = taosMemoryCalloc(1, size);
|
||||||
|
if (memBuf == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
if ((code = taosReadFile(pFile, memBuf, size)) != size) {
|
if ((code = taosReadFile(pFile, memBuf, size)) != size) {
|
||||||
ASSERT(0);
|
taosMemoryFree(memBuf);
|
||||||
// TODO handle error
|
return -1;
|
||||||
}
|
}
|
||||||
STqOffset offset;
|
STqOffset offset;
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, memBuf, size);
|
tDecoderInit(&decoder, memBuf, size);
|
||||||
if (tDecodeSTqOffset(&decoder, &offset) < 0) {
|
if (tDecodeSTqOffset(&decoder, &offset) < 0) {
|
||||||
ASSERT(0);
|
taosMemoryFree(memBuf);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) {
|
if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) {
|
||||||
ASSERT(0);
|
return -1;
|
||||||
// TODO
|
|
||||||
}
|
}
|
||||||
taosMemoryFree(memBuf);
|
taosMemoryFree(memBuf);
|
||||||
}
|
}
|
||||||
|
@ -124,7 +129,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
|
||||||
const char* sysErrStr = strerror(errno);
|
const char* sysErrStr = strerror(errno);
|
||||||
tqError("vgId:%d, cannot open file %s when commit offset since %s", pStore->pTq->pVnode->config.vgId, fname,
|
tqError("vgId:%d, cannot open file %s when commit offset since %s", pStore->pTq->pVnode->config.vgId, fname,
|
||||||
sysErrStr);
|
sysErrStr);
|
||||||
ASSERT(0);
|
taosMemoryFree(fname);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosMemoryFree(fname);
|
taosMemoryFree(fname);
|
||||||
|
|
|
@ -64,7 +64,7 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
|
||||||
.startTs = startTs,
|
.startTs = startTs,
|
||||||
.endTs = endTs,
|
.endTs = endTs,
|
||||||
};
|
};
|
||||||
strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN);
|
strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1);
|
||||||
taosMemoryFree(name);
|
taosMemoryFree(name);
|
||||||
/*tqDebug("stream delete msg, active: vgId:%d, ts:%" PRId64 " name:%s", pVnode->config.vgId, ts, name);*/
|
/*tqDebug("stream delete msg, active: vgId:%d, ts:%" PRId64 " name:%s", pVnode->config.vgId, ts, name);*/
|
||||||
taosArrayPush(deleteReq->deleteReqs, &req);
|
taosArrayPush(deleteReq->deleteReqs, &req);
|
||||||
|
|
|
@ -175,6 +175,8 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int vgId = TD_VID(pWriter->pTq->pVnode);
|
||||||
|
|
||||||
taosMemoryFree(pWriter);
|
taosMemoryFree(pWriter);
|
||||||
*ppWriter = NULL;
|
*ppWriter = NULL;
|
||||||
|
|
||||||
|
@ -186,7 +188,7 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code));
|
tqError("vgId:%d, tq snapshot writer close failed since %s", vgId, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue