refactor: multi-process test mode
This commit is contained in:
parent
46d566d345
commit
23707bfcb9
|
@ -27,16 +27,15 @@ extern "C" {
|
||||||
|
|
||||||
typedef struct SMgmtWrapper SMgmtWrapper;
|
typedef struct SMgmtWrapper SMgmtWrapper;
|
||||||
|
|
||||||
#define SINGLE_PROC 0
|
#define SINGLE_PROC 0
|
||||||
#define CHILD_PROC 1
|
#define CHILD_PROC 1
|
||||||
#define PARENT_PROC 2
|
#define PARENT_PROC 2
|
||||||
#define TEST_PROC 3
|
#define TEST_PROC 3
|
||||||
#define OnlyInSingleProc(ptype) (ptype == SINGLE_PROC)
|
#define OnlyInSingleProc(wrapper) ((wrapper)->proc.ptype == SINGLE_PROC)
|
||||||
#define OnlyInChildProc(ptype) (ptype == CHILD_PROC)
|
#define OnlyInChildProc(wrapper) ((wrapper)->proc.ptype == CHILD_PROC)
|
||||||
#define OnlyInParentProc(ptype) (ptype == PARENT_PROC)
|
#define OnlyInParentProc(wrapper) ((wrapper)->proc.ptype == PARENT_PROC)
|
||||||
#define OnlyInTestProc(ptype) (ptype == TEST_PROC)
|
#define InChildProc(wrapper) ((wrapper)->proc.ptype & CHILD_PROC)
|
||||||
#define InChildProc(ptype) (ptype & CHILD_PROC)
|
#define InParentProc(wrapper) ((wrapper)->proc.ptype & PARENT_PROC)
|
||||||
#define InParentProc(ptype) (ptype & PARENT_PROC)
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t head;
|
int32_t head;
|
||||||
|
|
|
@ -164,7 +164,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OnlyInSingleProc(pDnode->ptype) || InParentProc(pDnode->ptype)) {
|
if (pDnode->ptype == SINGLE_PROC || (pDnode->ptype & PARENT_PROC)) {
|
||||||
pDnode->lockfile = dmCheckRunning(tsDataDir);
|
pDnode->lockfile = dmCheckRunning(tsDataDir);
|
||||||
if (pDnode->lockfile == NULL) {
|
if (pDnode->lockfile == NULL) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
@ -231,7 +231,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
taosRLockLatch(&pWrapper->latch);
|
taosRLockLatch(&pWrapper->latch);
|
||||||
if (pWrapper->deployed || (InParentProc(pWrapper->proc.ptype) && pWrapper->required)) {
|
if (pWrapper->deployed || (InParentProc(pWrapper) && pWrapper->required)) {
|
||||||
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
|
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
|
||||||
dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount);
|
dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount);
|
||||||
} else {
|
} else {
|
||||||
|
@ -276,7 +276,6 @@ void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||||
rsp.contLen = pReq->contLen;
|
rsp.contLen = pReq->contLen;
|
||||||
}
|
}
|
||||||
rpcSendResponse(&rsp);
|
rpcSendResponse(&rsp);
|
||||||
rpcFreeCont(pReq->pCont);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pReq) {
|
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pReq) {
|
||||||
|
@ -304,5 +303,4 @@ void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pReq) {
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
rpcSendResponse(&rspMsg);
|
rpcSendResponse(&rspMsg);
|
||||||
rpcFreeCont(pReq->pCont);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,11 +75,11 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
SMgmtOutputOpt output = {0};
|
SMgmtOutputOpt output = {0};
|
||||||
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
|
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
|
||||||
|
|
||||||
if (pWrapper->ntype == DNODE || InChildProc(pWrapper->proc.ptype)) {
|
if (pWrapper->ntype == DNODE || InChildProc(pWrapper)) {
|
||||||
tmsgSetDefaultMsgCb(&input.msgCb);
|
tmsgSetDefaultMsgCb(&input.msgCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OnlyInSingleProc(pWrapper->proc.ptype)) {
|
if (OnlyInSingleProc(pWrapper)) {
|
||||||
dInfo("node:%s, start to open", pWrapper->name);
|
dInfo("node:%s, start to open", pWrapper->name);
|
||||||
if ((*pWrapper->func.openFp)(&input, &output) != 0) {
|
if ((*pWrapper->func.openFp)(&input, &output) != 0) {
|
||||||
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
|
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
|
||||||
|
@ -89,7 +89,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
pWrapper->deployed = true;
|
pWrapper->deployed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (InParentProc(pWrapper->proc.ptype)) {
|
if (InParentProc(pWrapper)) {
|
||||||
dDebug("node:%s, start to open", pWrapper->name);
|
dDebug("node:%s, start to open", pWrapper->name);
|
||||||
if (dmCreateShm(pWrapper) != 0) {
|
if (dmCreateShm(pWrapper) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -98,7 +98,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OnlyInParentProc(pWrapper->proc.ptype)) {
|
if (OnlyInParentProc(pWrapper)) {
|
||||||
if (dmInitProc(pWrapper) != 0) {
|
if (dmInitProc(pWrapper) != 0) {
|
||||||
dError("node:%s, failed to init proc since %s", pWrapper->name, terrstr());
|
dError("node:%s, failed to init proc since %s", pWrapper->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -118,7 +118,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
dDebug("node:%s, has been opened in parent process", pWrapper->name);
|
dDebug("node:%s, has been opened in parent process", pWrapper->name);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (InChildProc(pWrapper->proc.ptype)) {
|
if (InChildProc(pWrapper)) {
|
||||||
dDebug("node:%s, start to open", pWrapper->name);
|
dDebug("node:%s, start to open", pWrapper->name);
|
||||||
if ((*pWrapper->func.openFp)(&input, &output) != 0) {
|
if ((*pWrapper->func.openFp)(&input, &output) != 0) {
|
||||||
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
|
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
|
||||||
|
@ -143,7 +143,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dmStartNode(SMgmtWrapper *pWrapper) {
|
int32_t dmStartNode(SMgmtWrapper *pWrapper) {
|
||||||
if (OnlyInParentProc(pWrapper->proc.ptype)) return 0;
|
if (OnlyInParentProc(pWrapper)) return 0;
|
||||||
if (pWrapper->func.startFp != NULL) {
|
if (pWrapper->func.startFp != NULL) {
|
||||||
dDebug("node:%s, start to start", pWrapper->name);
|
dDebug("node:%s, start to start", pWrapper->name);
|
||||||
if ((*pWrapper->func.startFp)(pWrapper->pMgmt) != 0) {
|
if ((*pWrapper->func.startFp)(pWrapper->pMgmt) != 0) {
|
||||||
|
@ -173,7 +173,7 @@ void dmCloseNode(SMgmtWrapper *pWrapper) {
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OnlyInParentProc(pWrapper->proc.ptype)) {
|
if (OnlyInParentProc(pWrapper)) {
|
||||||
int32_t pid = pWrapper->proc.pid;
|
int32_t pid = pWrapper->proc.pid;
|
||||||
if (pid > 0 && taosProcExist(pid)) {
|
if (pid > 0 && taosProcExist(pid)) {
|
||||||
dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pid);
|
dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pid);
|
||||||
|
@ -191,7 +191,7 @@ void dmCloseNode(SMgmtWrapper *pWrapper) {
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pWrapper->latch);
|
taosWUnLockLatch(&pWrapper->latch);
|
||||||
|
|
||||||
if (!OnlyInSingleProc(pWrapper->proc.ptype)) {
|
if (!OnlyInSingleProc(pWrapper)) {
|
||||||
dmCleanupProc(pWrapper);
|
dmCleanupProc(pWrapper);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,7 +242,7 @@ static void dmCloseNodes(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dmWatchNodes(SDnode *pDnode) {
|
static void dmWatchNodes(SDnode *pDnode) {
|
||||||
if (!OnlyInParentProc(pDnode->ptype)) return;
|
if (pDnode->ptype != PARENT_PROC) return;
|
||||||
if (pDnode->rtype == NODE_END) return;
|
if (pDnode->rtype == NODE_END) return;
|
||||||
|
|
||||||
taosThreadMutexLock(&pDnode->mutex);
|
taosThreadMutexLock(&pDnode->mutex);
|
||||||
|
@ -251,7 +251,7 @@ static void dmWatchNodes(SDnode *pDnode) {
|
||||||
SProc *proc = &pWrapper->proc;
|
SProc *proc = &pWrapper->proc;
|
||||||
|
|
||||||
if (!pWrapper->required) continue;
|
if (!pWrapper->required) continue;
|
||||||
if (!OnlyInParentProc(proc->ptype)) continue;
|
if (!OnlyInParentProc(pWrapper)) continue;
|
||||||
|
|
||||||
if (proc->pid <= 0 || !taosProcExist(proc->pid)) {
|
if (proc->pid <= 0 || !taosProcExist(proc->pid)) {
|
||||||
dWarn("node:%s, process:%d is killed and needs to restart", pWrapper->name, proc->pid);
|
dWarn("node:%s, process:%d is killed and needs to restart", pWrapper->name, proc->pid);
|
||||||
|
@ -274,7 +274,7 @@ int32_t dmRunDnode(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (!pDnode->stop) {
|
if (pDnode->stop) {
|
||||||
dInfo("dnode is about to stop");
|
dInfo("dnode is about to stop");
|
||||||
dmSetStatus(pDnode, DND_STAT_STOPPED);
|
dmSetStatus(pDnode, DND_STAT_STOPPED);
|
||||||
dmStopNodes(pDnode);
|
dmStopNodes(pDnode);
|
||||||
|
|
|
@ -67,7 +67,7 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (InParentProc(proc->ptype)) {
|
if (proc->ptype & DND_PROC_PARENT) {
|
||||||
if (dmInitProcMutex(queue) != 0) {
|
if (dmInitProcMutex(queue) != 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -315,7 +315,7 @@ static void *dmConsumChildQueue(void *param) {
|
||||||
int32_t numOfMsgs = 0;
|
int32_t numOfMsgs = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
EProcFuncType ftype = DND_FUNC_REQ;
|
EProcFuncType ftype = DND_FUNC_REQ;
|
||||||
SRpcMsg *pReq = NULL;
|
SRpcMsg *pReq = NULL;
|
||||||
|
|
||||||
dDebug("node:%s, start to consume from cqueue", proc->name);
|
dDebug("node:%s, start to consume from cqueue", proc->name);
|
||||||
do {
|
do {
|
||||||
|
@ -392,12 +392,14 @@ static void *dmConsumParentQueue(void *param) {
|
||||||
rpcSendResponse(pRsp);
|
rpcSendResponse(pRsp);
|
||||||
} else if (ftype == DND_FUNC_REGIST) {
|
} else if (ftype == DND_FUNC_REGIST) {
|
||||||
pRsp = pHead;
|
pRsp = pHead;
|
||||||
dTrace("node:%s, get regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->info.handle);
|
dTrace("node:%s, get regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code,
|
||||||
|
pRsp->info.handle);
|
||||||
rpcRegisterBrokenLinkArg(pRsp);
|
rpcRegisterBrokenLinkArg(pRsp);
|
||||||
rpcFreeCont(pBody);
|
rpcFreeCont(pBody);
|
||||||
} else if (ftype == DND_FUNC_RELEASE) {
|
} else if (ftype == DND_FUNC_RELEASE) {
|
||||||
pRsp = pHead;
|
pRsp = pHead;
|
||||||
dTrace("node:%s, get release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->info.handle);
|
dTrace("node:%s, get release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code,
|
||||||
|
pRsp->info.handle);
|
||||||
dmRemoveProcRpcHandle(proc, pRsp->info.handle);
|
dmRemoveProcRpcHandle(proc, pRsp->info.handle);
|
||||||
rpcReleaseHandle(pRsp->info.handle, (int8_t)pRsp->code);
|
rpcReleaseHandle(pRsp->info.handle, (int8_t)pRsp->code);
|
||||||
rpcFreeCont(pBody);
|
rpcFreeCont(pBody);
|
||||||
|
@ -417,7 +419,7 @@ int32_t dmRunProc(SProc *proc) {
|
||||||
taosThreadAttrInit(&thAttr);
|
taosThreadAttrInit(&thAttr);
|
||||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
if (InParentProc(proc->ptype)) {
|
if (proc->ptype & DND_PROC_PARENT) {
|
||||||
if (taosThreadCreate(&proc->pthread, &thAttr, dmConsumParentQueue, proc) != 0) {
|
if (taosThreadCreate(&proc->pthread, &thAttr, dmConsumParentQueue, proc) != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
dError("node:%s, failed to create pthread since %s", proc->name, terrstr());
|
dError("node:%s, failed to create pthread since %s", proc->name, terrstr());
|
||||||
|
@ -426,7 +428,7 @@ int32_t dmRunProc(SProc *proc) {
|
||||||
dDebug("node:%s, thread:%" PRId64 " is created to consume pqueue", proc->name, proc->pthread);
|
dDebug("node:%s, thread:%" PRId64 " is created to consume pqueue", proc->name, proc->pthread);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (InChildProc(proc->ptype)) {
|
if (proc->ptype & DND_PROC_CHILD) {
|
||||||
if (taosThreadCreate(&proc->cthread, &thAttr, dmConsumChildQueue, proc) != 0) {
|
if (taosThreadCreate(&proc->cthread, &thAttr, dmConsumChildQueue, proc) != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
dError("node:%s, failed to create cthread since %s", proc->name, terrstr());
|
dError("node:%s, failed to create cthread since %s", proc->name, terrstr());
|
||||||
|
|
|
@ -52,7 +52,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SRpcMsg *pMsg = NULL;
|
SRpcMsg *pMsg = NULL;
|
||||||
bool needRelease = false;
|
bool needRelease = false;
|
||||||
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
|
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
|
||||||
SMgmtWrapper *pWrapper = NULL;
|
SMgmtWrapper *pWrapper = NULL;
|
||||||
|
|
||||||
dTrace("msg:%s is received, handle:%p cont:%p len:%d code:0x%04x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
|
dTrace("msg:%s is received, handle:%p cont:%p len:%d code:0x%04x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
|
||||||
|
@ -66,32 +66,31 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
|
|
||||||
if (pRpc->msgType == TDMT_DND_NET_TEST) {
|
if (pRpc->msgType == TDMT_DND_NET_TEST) {
|
||||||
dmProcessNetTestReq(pDnode, pRpc);
|
dmProcessNetTestReq(pDnode, pRpc);
|
||||||
return;
|
goto _OVER_JUST_FREE;
|
||||||
} else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) {
|
} else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) {
|
||||||
code = qWorkerProcessFetchRsp(NULL, NULL, pRpc);
|
qWorkerProcessFetchRsp(NULL, NULL, pRpc);
|
||||||
pRpc->pCont = NULL; // will be freed in qworker
|
goto _OVER_JUST_FREE;
|
||||||
return;
|
|
||||||
} else {
|
} else {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDnode->status != DND_STAT_RUNNING) {
|
if (pDnode->status != DND_STAT_RUNNING) {
|
||||||
if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
|
if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
|
||||||
dmProcessServerStartupStatus(pDnode, pRpc);
|
dmProcessServerStartupStatus(pDnode, pRpc);
|
||||||
|
goto _OVER_JUST_FREE;
|
||||||
} else {
|
} else {
|
||||||
SRpcMsg rspMsg = {.info = pRpc->info, .code = TSDB_CODE_APP_NOT_READY};
|
terrno = TSDB_CODE_APP_NOT_READY;
|
||||||
rpcSendResponse(&rspMsg);
|
goto _OVER_RSP_FREE;
|
||||||
}
|
}
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsReq(pRpc) && pRpc->pCont == NULL) {
|
if (IsReq(pRpc) && pRpc->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
||||||
goto _OVER;
|
goto _OVER_RSP_FREE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHandle->defaultNtype == NODE_END) {
|
if (pHandle->defaultNtype == NODE_END) {
|
||||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
goto _OVER;
|
goto _OVER_RSP_FREE;
|
||||||
} else {
|
} else {
|
||||||
pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
|
pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
|
||||||
if (pHandle->needCheckVgId) {
|
if (pHandle->needCheckVgId) {
|
||||||
|
@ -103,18 +102,16 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
} else if (vgId == MNODE_HANDLE) {
|
} else if (vgId == MNODE_HANDLE) {
|
||||||
pWrapper = &pDnode->wrappers[MNODE];
|
pWrapper = &pDnode->wrappers[MNODE];
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
||||||
goto _OVER;
|
goto _OVER_RSP_FREE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dmMarkWrapper(pWrapper) != 0) {
|
if (dmMarkWrapper(pWrapper) != 0) {
|
||||||
goto _OVER;
|
goto _OVER_RSP_FREE;
|
||||||
} else {
|
} else {
|
||||||
needRelease = true;
|
needRelease = true;
|
||||||
pRpc->info.wrapper = pWrapper;
|
pRpc->info.wrapper = pWrapper;
|
||||||
|
@ -129,7 +126,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (InParentProc(pWrapper->proc.ptype)) {
|
if (InParentProc(pWrapper)) {
|
||||||
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen,
|
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen,
|
||||||
(IsReq(pRpc) && (pRpc->code == 0)) ? pRpc->info.handle : NULL, pRpc->info.refId,
|
(IsReq(pRpc) && (pRpc->code == 0)) ? pRpc->info.handle : NULL, pRpc->info.refId,
|
||||||
DND_FUNC_REQ);
|
DND_FUNC_REQ);
|
||||||
|
@ -139,7 +136,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
if (pWrapper != NULL && InParentProc(pWrapper->proc.ptype)) {
|
if (pWrapper != NULL && InParentProc(pWrapper)) {
|
||||||
dTrace("msg:%p, is freed after push to cqueue", pMsg);
|
dTrace("msg:%p, is freed after push to cqueue", pMsg);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
rpcFreeCont(pRpc->pCont);
|
rpcFreeCont(pRpc->pCont);
|
||||||
|
@ -166,6 +163,16 @@ _OVER:
|
||||||
if (needRelease) {
|
if (needRelease) {
|
||||||
dmReleaseWrapper(pWrapper);
|
dmReleaseWrapper(pWrapper);
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
|
|
||||||
|
_OVER_JUST_FREE:
|
||||||
|
rpcFreeCont(pRpc->pCont);
|
||||||
|
return;
|
||||||
|
|
||||||
|
_OVER_RSP_FREE:
|
||||||
|
rpcFreeCont(pRpc->pCont);
|
||||||
|
SRpcMsg simpleRsp = {.code = terrno, .info = pRpc->info};
|
||||||
|
rpcSendResponse(&simpleRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dmInitMsgHandle(SDnode *pDnode) {
|
int32_t dmInitMsgHandle(SDnode *pDnode) {
|
||||||
|
@ -177,8 +184,8 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
|
||||||
if (pArray == NULL) return -1;
|
if (pArray == NULL) return -1;
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||||
SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
|
SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
|
||||||
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
|
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
|
||||||
if (pMgmt->needCheckVgId) {
|
if (pMgmt->needCheckVgId) {
|
||||||
pHandle->needCheckVgId = pMgmt->needCheckVgId;
|
pHandle->needCheckVgId = pMgmt->needCheckVgId;
|
||||||
}
|
}
|
||||||
|
@ -249,7 +256,7 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) {
|
||||||
|
|
||||||
static inline void dmSendRsp(const SRpcMsg *pMsg) {
|
static inline void dmSendRsp(const SRpcMsg *pMsg) {
|
||||||
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
||||||
if (InChildProc(pWrapper->proc.ptype)) {
|
if (InChildProc(pWrapper)) {
|
||||||
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_RSP);
|
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_RSP);
|
||||||
} else {
|
} else {
|
||||||
if (pMsg->code == TSDB_CODE_NODE_REDIRECT) {
|
if (pMsg->code == TSDB_CODE_NODE_REDIRECT) {
|
||||||
|
@ -262,7 +269,7 @@ static inline void dmSendRsp(const SRpcMsg *pMsg) {
|
||||||
|
|
||||||
static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
|
static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
|
||||||
SMgmtWrapper *pWrapper = pRsp->info.wrapper;
|
SMgmtWrapper *pWrapper = pRsp->info.wrapper;
|
||||||
if (InChildProc(pWrapper->proc.ptype)) {
|
if (InChildProc(pWrapper)) {
|
||||||
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP);
|
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP);
|
||||||
} else {
|
} else {
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
|
@ -280,7 +287,7 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe
|
||||||
|
|
||||||
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
|
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
|
||||||
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
||||||
if (InChildProc(pWrapper->proc.ptype)) {
|
if (InChildProc(pWrapper)) {
|
||||||
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST);
|
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST);
|
||||||
} else {
|
} else {
|
||||||
rpcRegisterBrokenLinkArg(pMsg);
|
rpcRegisterBrokenLinkArg(pMsg);
|
||||||
|
@ -289,7 +296,7 @@ static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
|
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
|
||||||
SMgmtWrapper *pWrapper = pHandle->wrapper;
|
SMgmtWrapper *pWrapper = pHandle->wrapper;
|
||||||
if (InChildProc(pWrapper->proc.ptype)) {
|
if (InChildProc(pWrapper)) {
|
||||||
SRpcMsg msg = {.code = type, .info = *pHandle};
|
SRpcMsg msg = {.code = type, .info = *pHandle};
|
||||||
dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE);
|
dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -495,6 +495,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||||
qProcessFetchRsp(NULL, pMsg, NULL);
|
qProcessFetchRsp(NULL, pMsg, NULL);
|
||||||
|
pMsg->pCont = NULL;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue