refactor: adjust dnode logs
This commit is contained in:
parent
21a9367d67
commit
f88e536b6b
|
@ -191,9 +191,9 @@ static int32_t dmRunDnode() {
|
||||||
dmSetSignalHandle();
|
dmSetSignalHandle();
|
||||||
}
|
}
|
||||||
|
|
||||||
dInfo("start the service");
|
dInfo("start to run dnode");
|
||||||
int32_t code = dmRun(pDnode);
|
int32_t code = dmRun(pDnode);
|
||||||
dInfo("start shutting down the service");
|
dInfo("shutting down the service");
|
||||||
|
|
||||||
global.pDnode = NULL;
|
global.pDnode = NULL;
|
||||||
dmClose(pDnode);
|
dmClose(pDnode);
|
||||||
|
|
|
@ -33,7 +33,6 @@ static void dmStopMgmt(SDnodeMgmt *pMgmt) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||||
dInfo("dnode-mgmt start to init");
|
|
||||||
SDnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeMgmt));
|
SDnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeMgmt));
|
||||||
if (pMgmt == NULL) {
|
if (pMgmt == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -58,15 +57,12 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pOutput->pMgmt = pMgmt;
|
pOutput->pMgmt = pMgmt;
|
||||||
dInfo("dnode-mgmt is initialized");
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dmCloseMgmt(SDnodeMgmt *pMgmt) {
|
static void dmCloseMgmt(SDnodeMgmt *pMgmt) {
|
||||||
dInfo("dnode-mgmt start to clean up");
|
|
||||||
dmStopWorker(pMgmt);
|
dmStopWorker(pMgmt);
|
||||||
taosMemoryFree(pMgmt);
|
taosMemoryFree(pMgmt);
|
||||||
dInfo("dnode-mgmt is cleaned up");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dmRequireMgmt(const SMgmtInputOpt *pInput, bool *required) {
|
static int32_t dmRequireMgmt(const SMgmtInputOpt *pInput, bool *required) {
|
||||||
|
|
|
@ -212,7 +212,7 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
|
||||||
}
|
}
|
||||||
|
|
||||||
dmReportStartup(pDnode, "dnode-transport", "initialized");
|
dmReportStartup(pDnode, "dnode-transport", "initialized");
|
||||||
dInfo("dnode is created, data:%p", pDnode);
|
dInfo("dnode is created, ptr:%p", pDnode);
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
@ -231,7 +231,7 @@ void dmClose(SDnode *pDnode) {
|
||||||
dmCleanupClient(pDnode);
|
dmCleanupClient(pDnode);
|
||||||
dmCleanupServer(pDnode);
|
dmCleanupServer(pDnode);
|
||||||
dmClearVars(pDnode);
|
dmClearVars(pDnode);
|
||||||
dInfo("dnode is closed, data:%p", pDnode);
|
dInfo("dnode is closed, ptr:%p", pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
|
void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
|
||||||
|
@ -254,7 +254,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
|
||||||
taosRLockLatch(&pWrapper->latch);
|
taosRLockLatch(&pWrapper->latch);
|
||||||
if (pWrapper->deployed) {
|
if (pWrapper->deployed) {
|
||||||
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
|
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
|
||||||
dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount);
|
dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount);
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||||
pRetWrapper = NULL;
|
pRetWrapper = NULL;
|
||||||
|
@ -270,7 +270,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
|
||||||
taosRLockLatch(&pWrapper->latch);
|
taosRLockLatch(&pWrapper->latch);
|
||||||
if (pWrapper->deployed || (InParentProc(pWrapper->proc.ptype) && pWrapper->required)) {
|
if (pWrapper->deployed || (InParentProc(pWrapper->proc.ptype) && pWrapper->required)) {
|
||||||
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
|
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
|
||||||
dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
|
dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount);
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -286,14 +286,14 @@ void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
|
||||||
taosRLockLatch(&pWrapper->latch);
|
taosRLockLatch(&pWrapper->latch);
|
||||||
int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
|
int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
|
||||||
taosRUnLockLatch(&pWrapper->latch);
|
taosRUnLockLatch(&pWrapper->latch);
|
||||||
dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount);
|
dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
|
void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
|
||||||
SStartupInfo *pStartup = &pDnode->startup;
|
SStartupInfo *pStartup = &pDnode->startup;
|
||||||
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
|
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
|
||||||
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
|
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
|
||||||
dInfo("step:%s, %s", pStartup->name, pStartup->desc);
|
dDebug("step:%s, %s", pStartup->name, pStartup->desc);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc) {
|
void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc) {
|
||||||
|
|
|
@ -99,15 +99,17 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OnlyInSingleProc(pWrapper->proc.ptype)) {
|
if (OnlyInSingleProc(pWrapper->proc.ptype)) {
|
||||||
|
dInfo("node:%s, start to open", pWrapper->name);
|
||||||
if ((*pWrapper->func.openFp)(&input, &output) != 0) {
|
if ((*pWrapper->func.openFp)(&input, &output) != 0) {
|
||||||
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
|
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
dDebug("node:%s, has been opened", pWrapper->name);
|
dInfo("node:%s, has been opened", pWrapper->name);
|
||||||
pWrapper->deployed = true;
|
pWrapper->deployed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (InChildProc(pWrapper->proc.ptype)) {
|
if (InChildProc(pWrapper->proc.ptype)) {
|
||||||
|
dDebug("node:%s, start to open", pWrapper->name);
|
||||||
if ((*pWrapper->func.openFp)(&input, &output) != 0) {
|
if ((*pWrapper->func.openFp)(&input, &output) != 0) {
|
||||||
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
|
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -123,6 +125,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (InParentProc(pWrapper->proc.ptype)) {
|
if (InParentProc(pWrapper->proc.ptype)) {
|
||||||
|
dDebug("node:%s, start to open", pWrapper->name);
|
||||||
if (dmInitParentProc(pWrapper) != 0) {
|
if (dmInitParentProc(pWrapper) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -145,9 +148,13 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
|
|
||||||
int32_t dmStartNode(SMgmtWrapper *pWrapper) {
|
int32_t dmStartNode(SMgmtWrapper *pWrapper) {
|
||||||
if (OnlyInParentProc(pWrapper->proc.ptype)) return 0;
|
if (OnlyInParentProc(pWrapper->proc.ptype)) return 0;
|
||||||
if (pWrapper->func.startFp != NULL && (*pWrapper->func.startFp)(pWrapper->pMgmt) != 0) {
|
if (pWrapper->func.startFp != NULL) {
|
||||||
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
|
dDebug("node:%s, start to start", pWrapper->name);
|
||||||
return -1;
|
if ((*pWrapper->func.startFp)(pWrapper->pMgmt) != 0) {
|
||||||
|
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
dDebug("node:%s, has been started", pWrapper->name);
|
||||||
}
|
}
|
||||||
|
|
||||||
dmReportStartup(pWrapper->pDnode, pWrapper->name, "started");
|
dmReportStartup(pWrapper->pDnode, pWrapper->name, "started");
|
||||||
|
@ -156,6 +163,7 @@ int32_t dmStartNode(SMgmtWrapper *pWrapper) {
|
||||||
|
|
||||||
void dmStopNode(SMgmtWrapper *pWrapper) {
|
void dmStopNode(SMgmtWrapper *pWrapper) {
|
||||||
if (pWrapper->func.stopFp != NULL && pWrapper->pMgmt != NULL) {
|
if (pWrapper->func.stopFp != NULL && pWrapper->pMgmt != NULL) {
|
||||||
|
dDebug("node:%s, start to stop", pWrapper->name);
|
||||||
(*pWrapper->func.stopFp)(pWrapper->pMgmt);
|
(*pWrapper->func.stopFp)(pWrapper->pMgmt);
|
||||||
dDebug("node:%s, has been stopped", pWrapper->name);
|
dDebug("node:%s, has been stopped", pWrapper->name);
|
||||||
}
|
}
|
||||||
|
|
|
@ -185,8 +185,8 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
|
||||||
taosThreadMutexUnlock(&queue->mutex);
|
taosThreadMutexUnlock(&queue->mutex);
|
||||||
tsem_post(&queue->sem);
|
tsem_post(&queue->sem);
|
||||||
|
|
||||||
dTrace("node:%s, push proc msg at pos:%d ftype:%d remain:%d handle:%p ref:%" PRId64 ", head:%d %p body:%d %p",
|
dTrace("node:%s, push msg:%p:%d cont:%p%d handle:%p, ftype:%d pos:%d remain:%d", queue->name, pHead, headLen, pBody,
|
||||||
queue->name, pos, ftype, queue->items, (void *)handle, handleRef, headLen, pHead, bodyLen, pBody);
|
bodyLen, (void *)handle, ftype, pos, queue->items);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -315,31 +315,31 @@ static void *dmConsumChildQueue(void *param) {
|
||||||
EProcFuncType ftype = PROC_FUNC_REQ;
|
EProcFuncType ftype = PROC_FUNC_REQ;
|
||||||
SNodeMsg *pReq = NULL;
|
SNodeMsg *pReq = NULL;
|
||||||
|
|
||||||
dDebug("node:%s, start to consume from child queue", proc->name);
|
dDebug("node:%s, start to consume from cqueue", proc->name);
|
||||||
do {
|
do {
|
||||||
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
|
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
|
||||||
if (numOfMsgs == 0) {
|
if (numOfMsgs == 0) {
|
||||||
dDebug("node:%s, get no msg from child queue and exit thread", proc->name);
|
dDebug("node:%s, get no msg from cueue and exit thread", proc->name);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numOfMsgs < 0) {
|
if (numOfMsgs < 0) {
|
||||||
dError("node:%s, get no msg from child queue since %s", proc->name, terrstr());
|
dError("node:%s, get no msg from cqueue since %s", proc->name, terrstr());
|
||||||
taosMsleep(1);
|
taosMsleep(1);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ftype != PROC_FUNC_REQ) {
|
if (ftype != PROC_FUNC_REQ) {
|
||||||
dFatal("node:%s, msg:%p from child queue, invalid ftype:%d", proc->name, pHead, ftype);
|
dFatal("node:%s, msg:%p from cqueue, invalid ftype:%d", proc->name, pHead, ftype);
|
||||||
taosFreeQitem(pHead);
|
taosFreeQitem(pHead);
|
||||||
rpcFreeCont(pBody);
|
rpcFreeCont(pBody);
|
||||||
} else {
|
} else {
|
||||||
dTrace("node:%s, msg:%p from child queue", proc->name, pHead);
|
dTrace("node:%s, msg:%p from cueue", proc->name, pHead);
|
||||||
pReq = pHead;
|
pReq = pHead;
|
||||||
pReq->rpcMsg.pCont = pBody;
|
pReq->rpcMsg.pCont = pBody;
|
||||||
code = dmProcessNodeMsg(pWrapper, pReq);
|
code = dmProcessNodeMsg(pWrapper, pReq);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
dError("node:%s, failed to process msg:%p since %s, put into parent queue", proc->name, pReq, terrstr());
|
dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pReq, terrstr());
|
||||||
SRpcMsg rspMsg = {
|
SRpcMsg rspMsg = {
|
||||||
.handle = pReq->rpcMsg.handle,
|
.handle = pReq->rpcMsg.handle,
|
||||||
.ahandle = pReq->rpcMsg.ahandle,
|
.ahandle = pReq->rpcMsg.ahandle,
|
||||||
|
@ -371,16 +371,16 @@ static void *dmConsumParentQueue(void *param) {
|
||||||
EProcFuncType ftype = PROC_FUNC_REQ;
|
EProcFuncType ftype = PROC_FUNC_REQ;
|
||||||
SRpcMsg *pRsp = NULL;
|
SRpcMsg *pRsp = NULL;
|
||||||
|
|
||||||
dDebug("node:%s, start to consume from parent queue", proc->name);
|
dDebug("node:%s, start to consume from pqueue", proc->name);
|
||||||
do {
|
do {
|
||||||
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
|
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
|
||||||
if (numOfMsgs == 0) {
|
if (numOfMsgs == 0) {
|
||||||
dDebug("node:%s, get no msg from parent queue and exit thread", proc->name);
|
dDebug("node:%s, get no msg from pqueue and exit thread", proc->name);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numOfMsgs < 0) {
|
if (numOfMsgs < 0) {
|
||||||
dError("node:%s, get no msg from parent queue since %s", proc->name, terrstr());
|
dError("node:%s, get no msg from pqueue since %s", proc->name, terrstr());
|
||||||
taosMsleep(1);
|
taosMsleep(1);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -388,22 +388,22 @@ static void *dmConsumParentQueue(void *param) {
|
||||||
if (ftype == PROC_FUNC_RSP) {
|
if (ftype == PROC_FUNC_RSP) {
|
||||||
pRsp = pHead;
|
pRsp = pHead;
|
||||||
pRsp->pCont = pBody;
|
pRsp->pCont = pBody;
|
||||||
dTrace("node:%s, rsp msg:%p from parent queue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
|
dTrace("node:%s, rsp msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
|
||||||
dmRemoveProcRpcHandle(proc, pRsp->handle);
|
dmRemoveProcRpcHandle(proc, pRsp->handle);
|
||||||
rpcSendResponse(pRsp);
|
rpcSendResponse(pRsp);
|
||||||
} else if (ftype == PROC_FUNC_REGIST) {
|
} else if (ftype == PROC_FUNC_REGIST) {
|
||||||
pRsp = pHead;
|
pRsp = pHead;
|
||||||
dTrace("node:%s, regist msg:%p from parent queue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
|
dTrace("node:%s, regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
|
||||||
rpcRegisterBrokenLinkArg(pRsp);
|
rpcRegisterBrokenLinkArg(pRsp);
|
||||||
rpcFreeCont(pBody);
|
rpcFreeCont(pBody);
|
||||||
} else if (ftype == PROC_FUNC_RELEASE) {
|
} else if (ftype == PROC_FUNC_RELEASE) {
|
||||||
pRsp = pHead;
|
pRsp = pHead;
|
||||||
dTrace("node:%s, release msg:%p from parent queue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
|
dTrace("node:%s, release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
|
||||||
dmRemoveProcRpcHandle(proc, pRsp->handle);
|
dmRemoveProcRpcHandle(proc, pRsp->handle);
|
||||||
rpcReleaseHandle(pRsp->handle, (int8_t)pRsp->code);
|
rpcReleaseHandle(pRsp->handle, (int8_t)pRsp->code);
|
||||||
rpcFreeCont(pBody);
|
rpcFreeCont(pBody);
|
||||||
} else {
|
} else {
|
||||||
dFatal("node:%s, msg:%p get from parent queue, invalid ftype:%d", proc->name, pHead, ftype);
|
dFatal("node:%s, msg:%p get from pqueue, invalid ftype:%d", proc->name, pHead, ftype);
|
||||||
rpcFreeCont(pBody);
|
rpcFreeCont(pBody);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -502,7 +502,7 @@ void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const vo
|
||||||
EProcFuncType ftype) {
|
EProcFuncType ftype) {
|
||||||
int32_t retry = 0;
|
int32_t retry = 0;
|
||||||
while (dmPushToProcQueue(proc, proc->pqueue, pHead, headLen, pBody, bodyLen, 0, 0, ftype) != 0) {
|
while (dmPushToProcQueue(proc, proc->pqueue, pHead, headLen, pBody, bodyLen, 0, 0, ftype) != 0) {
|
||||||
dWarn("node:%s, failed to put msg:%p to parent queue since %s, retry:%d", proc->name, pHead, terrstr(), retry);
|
dWarn("node:%s, failed to put msg:%p to pqueue since %s, retry:%d", proc->name, pHead, terrstr(), retry);
|
||||||
retry++;
|
retry++;
|
||||||
taosMsleep(retry);
|
taosMsleep(retry);
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,13 +121,13 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dTrace("msg:%s is received, handle:%p app:%p", TMSG_INFO(msgType), pRpc->handle, pRpc->ahandle);
|
||||||
if (dmMarkWrapper(pWrapper) != 0) {
|
if (dmMarkWrapper(pWrapper) != 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
} else {
|
} else {
|
||||||
needRelease = true;
|
needRelease = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("msg:%s is received, handle:%p app:%p", TMSG_INFO(msgType), pRpc->handle, pRpc->ahandle);
|
|
||||||
pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM);
|
pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM);
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
@ -138,7 +138,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (InParentProc(pWrapper->proc.ptype)) {
|
if (InParentProc(pWrapper->proc.ptype)) {
|
||||||
dTrace("msg:%p, put into child queue, handle:%p", pMsg, pRpc->handle);
|
dTrace("msg:%p, put into cqueue, handle:%p ref:%" PRId64, pMsg, pRpc->handle, pRpc->refId);
|
||||||
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen,
|
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen,
|
||||||
(isReq && (pRpc->code == 0)) ? pRpc->handle : NULL, pRpc->refId, PROC_FUNC_REQ);
|
(isReq && (pRpc->code == 0)) ? pRpc->handle : NULL, pRpc->refId, PROC_FUNC_REQ);
|
||||||
} else {
|
} else {
|
||||||
|
@ -148,7 +148,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
if (pWrapper != NULL && InParentProc(pWrapper->proc.ptype)) {
|
if (pWrapper != NULL && InParentProc(pWrapper->proc.ptype)) {
|
||||||
dTrace("msg:%p, freed in parent process", pMsg);
|
dTrace("msg:%p, is freed after push to cqueue", pMsg);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
rpcFreeCont(pRpc->pCont);
|
rpcFreeCont(pRpc->pCont);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue