refactor: adjust dnode logs
This commit is contained in:
parent
f88e536b6b
commit
24feb934eb
|
@ -38,13 +38,6 @@ typedef struct SMgmtWrapper SMgmtWrapper;
|
|||
#define InChildProc(ptype) (ptype & CHILD_PROC)
|
||||
#define InParentProc(ptype) (ptype & PARENT_PROC)
|
||||
|
||||
typedef enum {
|
||||
PROC_FUNC_REQ = 1,
|
||||
PROC_FUNC_RSP = 2,
|
||||
PROC_FUNC_REGIST = 3,
|
||||
PROC_FUNC_RELEASE = 4,
|
||||
} EProcFuncType;
|
||||
|
||||
typedef struct {
|
||||
int32_t head;
|
||||
int32_t tail;
|
||||
|
|
|
@ -135,7 +135,6 @@ static void dmClearVars(SDnode *pDnode) {
|
|||
taosThreadMutexDestroy(&pDnode->mutex);
|
||||
memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
|
||||
taosMemoryFree(pDnode);
|
||||
dDebug("dnode memory is cleared, data:%p", pDnode);
|
||||
}
|
||||
|
||||
SDnode *dmCreate(const SDnodeOpt *pOption) {
|
||||
|
|
|
@ -131,7 +131,7 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (handle != 0 && ftype == PROC_FUNC_REQ) {
|
||||
if (handle != 0 && ftype == DND_FUNC_REQ) {
|
||||
if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) {
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
return -1;
|
||||
|
@ -185,8 +185,8 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
|
|||
taosThreadMutexUnlock(&queue->mutex);
|
||||
tsem_post(&queue->sem);
|
||||
|
||||
dTrace("node:%s, push msg:%p:%d cont:%p%d handle:%p, ftype:%d pos:%d remain:%d", queue->name, pHead, headLen, pBody,
|
||||
bodyLen, (void *)handle, ftype, pos, queue->items);
|
||||
dTrace("node:%s, push msg:%p %d cont:%p %d handle:%p, ftype:%s pos:%d remain:%d", queue->name, pHead, headLen, pBody,
|
||||
bodyLen, (void *)handle, dmFuncStr(ftype), pos, queue->items);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -269,8 +269,8 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
|
|||
*pBodyLen = rawBodyLen;
|
||||
*pFuncType = (EProcFuncType)ftype;
|
||||
|
||||
dTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", queue->name, pos, ftype, queue->items,
|
||||
rawHeadLen, pHead, rawBodyLen, pBody);
|
||||
dTrace("node:%s, pop msg:%p %d body:%p %d, ftype:%s pos:%d remain:%d", queue->name, pHead, rawHeadLen, pBody,
|
||||
rawBodyLen, dmFuncStr(ftype), pos, queue->items);
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -312,7 +312,7 @@ static void *dmConsumChildQueue(void *param) {
|
|||
int32_t bodyLen = 0;
|
||||
int32_t numOfMsgs = 0;
|
||||
int32_t code = 0;
|
||||
EProcFuncType ftype = PROC_FUNC_REQ;
|
||||
EProcFuncType ftype = DND_FUNC_REQ;
|
||||
SNodeMsg *pReq = NULL;
|
||||
|
||||
dDebug("node:%s, start to consume from cqueue", proc->name);
|
||||
|
@ -329,7 +329,7 @@ static void *dmConsumChildQueue(void *param) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (ftype != PROC_FUNC_REQ) {
|
||||
if (ftype != DND_FUNC_REQ) {
|
||||
dFatal("node:%s, msg:%p from cqueue, invalid ftype:%d", proc->name, pHead, ftype);
|
||||
taosFreeQitem(pHead);
|
||||
rpcFreeCont(pBody);
|
||||
|
@ -347,7 +347,7 @@ static void *dmConsumChildQueue(void *param) {
|
|||
.pCont = pReq->pRsp,
|
||||
.contLen = pReq->rspLen,
|
||||
};
|
||||
dmPutToProcPQueue(proc, &rspMsg, sizeof(SRpcMsg), rspMsg.pCont, rspMsg.contLen, PROC_FUNC_RSP);
|
||||
dmPutToProcPQueue(proc, &rspMsg, sizeof(SRpcMsg), rspMsg.pCont, rspMsg.contLen, DND_FUNC_RSP);
|
||||
taosFreeQitem(pHead);
|
||||
rpcFreeCont(pBody);
|
||||
rpcFreeCont(rspMsg.pCont);
|
||||
|
@ -368,7 +368,7 @@ static void *dmConsumParentQueue(void *param) {
|
|||
int32_t bodyLen = 0;
|
||||
int32_t numOfMsgs = 0;
|
||||
int32_t code = 0;
|
||||
EProcFuncType ftype = PROC_FUNC_REQ;
|
||||
EProcFuncType ftype = DND_FUNC_REQ;
|
||||
SRpcMsg *pRsp = NULL;
|
||||
|
||||
dDebug("node:%s, start to consume from pqueue", proc->name);
|
||||
|
@ -385,18 +385,18 @@ static void *dmConsumParentQueue(void *param) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (ftype == PROC_FUNC_RSP) {
|
||||
if (ftype == DND_FUNC_RSP) {
|
||||
pRsp = pHead;
|
||||
pRsp->pCont = pBody;
|
||||
dTrace("node:%s, rsp msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
|
||||
dmRemoveProcRpcHandle(proc, pRsp->handle);
|
||||
rpcSendResponse(pRsp);
|
||||
} else if (ftype == PROC_FUNC_REGIST) {
|
||||
} else if (ftype == DND_FUNC_REGIST) {
|
||||
pRsp = pHead;
|
||||
dTrace("node:%s, regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
|
||||
rpcRegisterBrokenLinkArg(pRsp);
|
||||
rpcFreeCont(pBody);
|
||||
} else if (ftype == PROC_FUNC_RELEASE) {
|
||||
} else if (ftype == DND_FUNC_RELEASE) {
|
||||
pRsp = pHead;
|
||||
dTrace("node:%s, release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
|
||||
dmRemoveProcRpcHandle(proc, pRsp->handle);
|
||||
|
|
|
@ -138,9 +138,9 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
}
|
||||
|
||||
if (InParentProc(pWrapper->proc.ptype)) {
|
||||
dTrace("msg:%p, put into cqueue, handle:%p ref:%" PRId64, pMsg, pRpc->handle, pRpc->refId);
|
||||
dTrace("msg:%p, put into cqueue, handle:%p refId:%" PRId64, pMsg, pRpc->handle, pRpc->refId);
|
||||
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen,
|
||||
(isReq && (pRpc->code == 0)) ? pRpc->handle : NULL, pRpc->refId, PROC_FUNC_REQ);
|
||||
(isReq && (pRpc->code == 0)) ? pRpc->handle : NULL, pRpc->refId, DND_FUNC_REQ);
|
||||
} else {
|
||||
code = dmProcessNodeMsg(pWrapper, pMsg);
|
||||
}
|
||||
|
@ -277,7 +277,7 @@ static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
|
|||
if (!InChildProc(pWrapper->proc.ptype)) {
|
||||
dmSendRpcRsp(pWrapper->pDnode, pRsp);
|
||||
} else {
|
||||
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
|
||||
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -295,7 +295,7 @@ static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp
|
|||
rsp.refId = pRsp->refId;
|
||||
rpcSendResponse(&rsp);
|
||||
} else {
|
||||
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
|
||||
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -303,7 +303,7 @@ static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg
|
|||
if (!InChildProc(pWrapper->proc.ptype)) {
|
||||
rpcRegisterBrokenLinkArg(pMsg);
|
||||
} else {
|
||||
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_FUNC_REGIST);
|
||||
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -312,7 +312,7 @@ static inline void dmReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t
|
|||
rpcReleaseHandle(handle, type);
|
||||
} else {
|
||||
SRpcMsg msg = {.handle = handle, .code = type};
|
||||
dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_FUNC_RELEASE);
|
||||
dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -80,6 +80,13 @@ typedef enum {
|
|||
DND_PROC_TEST,
|
||||
} EDndProcType;
|
||||
|
||||
typedef enum {
|
||||
DND_FUNC_REQ = 1,
|
||||
DND_FUNC_RSP = 2,
|
||||
DND_FUNC_REGIST = 3,
|
||||
DND_FUNC_RELEASE = 4,
|
||||
} EProcFuncType;
|
||||
|
||||
typedef int32_t (*ProcessCreateNodeFp)(struct SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
|
||||
typedef int32_t (*ProcessDropNodeFp)(struct SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
|
||||
typedef bool (*IsNodeRequiredFp)(struct SDnode *pDnode, EDndNodeType ntype);
|
||||
|
@ -157,6 +164,7 @@ const char *dmNodeProcName(EDndNodeType ntype);
|
|||
const char *dmNodeName(EDndNodeType ntype);
|
||||
const char *dmEventStr(EDndEvent etype);
|
||||
const char *dmProcStr(EDndProcType ptype);
|
||||
const char *dmFuncStr(EProcFuncType etype);
|
||||
void *dmSetMgmtHandle(SArray *pArray, tmsg_t msgType, void *nodeMsgFp, bool needCheckVgId);
|
||||
void dmGetMonitorSystemInfo(SMonSysInfo *pInfo);
|
||||
|
||||
|
|
|
@ -108,6 +108,21 @@ const char *dmProcStr(EDndProcType etype) {
|
|||
}
|
||||
}
|
||||
|
||||
const char *dmFuncStr(EProcFuncType etype) {
|
||||
switch (etype) {
|
||||
case DND_FUNC_REQ:
|
||||
return "req";
|
||||
case DND_FUNC_RSP:
|
||||
return "rsp";
|
||||
case DND_FUNC_REGIST:
|
||||
return "regist";
|
||||
case DND_FUNC_RELEASE:
|
||||
return "release";
|
||||
default:
|
||||
return "UNKNOWN";
|
||||
}
|
||||
}
|
||||
|
||||
void *dmSetMgmtHandle(SArray *pArray, tmsg_t msgType, void *nodeMsgFp, bool needCheckVgId) {
|
||||
SMgmtHandle handle = {
|
||||
.msgType = msgType,
|
||||
|
|
|
@ -120,20 +120,20 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) {
|
|||
SProc *cproc = dmInitProc(&cfg);
|
||||
ASSERT_NE(cproc, nullptr);
|
||||
|
||||
ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_RSP), 0);
|
||||
ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_REGIST), 0);
|
||||
ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_RELEASE), 0);
|
||||
ASSERT_NE(dmPutToProcCQueue(cproc, NULL, 12, body, 0, 0, 0, PROC_FUNC_REQ), 0);
|
||||
ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_REQ), 0);
|
||||
ASSERT_NE(dmPutToProcCQueue(cproc, &head, shm.size, body, 0, 0, 0, PROC_FUNC_REQ), 0);
|
||||
ASSERT_NE(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, shm.size, 0, 0, PROC_FUNC_REQ), 0);
|
||||
ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, DND_FUNC_RSP), 0);
|
||||
ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, DND_FUNC_REGIST), 0);
|
||||
ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, DND_FUNC_RELEASE), 0);
|
||||
ASSERT_NE(dmPutToProcCQueue(cproc, NULL, 12, body, 0, 0, 0, DND_FUNC_REQ), 0);
|
||||
ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, DND_FUNC_REQ), 0);
|
||||
ASSERT_NE(dmPutToProcCQueue(cproc, &head, shm.size, body, 0, 0, 0, DND_FUNC_REQ), 0);
|
||||
ASSERT_NE(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, shm.size, 0, 0, DND_FUNC_REQ), 0);
|
||||
|
||||
for (int32_t j = 0; j < 1000; j++) {
|
||||
int32_t i = 0;
|
||||
for (i = 0; i < 20; ++i) {
|
||||
ASSERT_EQ(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, 0, 0, PROC_FUNC_REQ), 0);
|
||||
ASSERT_EQ(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, 0, 0, DND_FUNC_REQ), 0);
|
||||
}
|
||||
ASSERT_NE(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, 0, 0, PROC_FUNC_REQ), 0);
|
||||
ASSERT_NE(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, 0, 0, DND_FUNC_REQ), 0);
|
||||
|
||||
cfg.isChild = true;
|
||||
cfg.name = "1235_p";
|
||||
|
@ -186,7 +186,7 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) {
|
|||
for (int32_t j = 0; j < 1000; j++) {
|
||||
int32_t i = 0;
|
||||
for (i = 0; i < 20; ++i) {
|
||||
dmPutToProcPQueue(pproc, &head, sizeof(STestMsg), body, i, PROC_FUNC_REQ);
|
||||
dmPutToProcPQueue(pproc, &head, sizeof(STestMsg), body, i, DND_FUNC_REQ);
|
||||
}
|
||||
|
||||
dmRunProc(cproc);
|
||||
|
@ -236,7 +236,7 @@ TEST_F(UtilTesProc, 03_Handle) {
|
|||
int32_t i = 0;
|
||||
for (i = 0; i < 20; ++i) {
|
||||
head.handle = (void *)((int64_t)i);
|
||||
ASSERT_EQ(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), i, PROC_FUNC_REQ), 0);
|
||||
ASSERT_EQ(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), i, DND_FUNC_REQ), 0);
|
||||
}
|
||||
|
||||
cfg.isChild = true;
|
||||
|
|
Loading…
Reference in New Issue