refactor: enable multi-process mode

This commit is contained in:
Shengliang Guan 2022-05-15 23:26:55 +08:00
parent 47dde96404
commit 234325736c
4 changed files with 27 additions and 24 deletions

View File

@ -267,7 +267,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t code = 0;
taosRLockLatch(&pWrapper->latch);
if (pWrapper->deployed || (InParentProc(pWrapper->proc.ptype) && pWrapper->required)) {
if (pWrapper->deployed /* || (OnlyInParentProc(pWrapper->proc.ptype) && pWrapper->required) */) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount);
} else {

View File

@ -73,7 +73,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
SMgmtOutputOpt output = {0};
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
if (pWrapper->ntype == DNODE || OnlyInChildProc(pWrapper->proc.ptype)) {
if (pWrapper->ntype == DNODE || InChildProc(pWrapper->proc.ptype)) {
tmsgSetDefaultMsgCb(&input.msgCb);
}

View File

@ -185,7 +185,7 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
taosThreadMutexUnlock(&queue->mutex);
tsem_post(&queue->sem);
dTrace("node:%s, push %s msg:%p %d cont:%p %d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen,
dTrace("node:%s, push %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen,
pBody, bodyLen, pos, queue->items);
return 0;
}
@ -269,8 +269,8 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
*pBodyLen = rawBodyLen;
*pFuncType = (EProcFuncType)ftype;
dTrace("node:%s, pop %s msg:%p %d body:%p %d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, rawHeadLen,
pBody, rawBodyLen, pos, queue->items);
dTrace("node:%s, pop %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen, pBody,
bodyLen, pos, queue->items);
return 1;
}

View File

@ -82,24 +82,27 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
if (msgType == TDMT_DND_NET_TEST) {
dmProcessNetTestReq(pDnode, pRpc);
code = 0;
goto _OVER;
return;
} else if (msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || msgType == TDMT_VND_FETCH_RSP) {
code = qWorkerProcessFetchRsp(NULL, NULL, pRpc);
pRpc->pCont = NULL; // will be freed in qworker
code = 0;
goto _OVER;
return;
} else {
}
if (pDnode->status != DND_STAT_RUNNING) {
if (msgType == TDMT_DND_SERVER_STATUS) {
dmProcessServerStartupStatus(pDnode, pRpc);
code = 0;
} else {
terrno = TSDB_CODE_APP_NOT_READY;
SRpcMsg rspMsg = {
.handle = pRpc->handle,
.code = TSDB_CODE_APP_NOT_READY,
.ahandle = pRpc->ahandle,
.refId = pRpc->refId,
};
rpcSendResponse(&rspMsg);
}
goto _OVER;
return;
}
if (isReq && pRpc->pCont == NULL) {
@ -282,15 +285,17 @@ static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SR
}
static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
if (!InChildProc(pWrapper->proc.ptype)) {
dmSendRpcRsp(pWrapper->pDnode, pRsp);
} else {
if (InChildProc(pWrapper->proc.ptype)) {
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP);
} else {
dmSendRpcRsp(pWrapper->pDnode, pRsp);
}
}
static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
if (!InChildProc(pWrapper->proc.ptype)) {
if (InChildProc(pWrapper->proc.ptype)) {
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP);
} else {
SRpcMsg rsp = {0};
SMEpSet msg = {.epSet = *pNewEpSet};
int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
@ -302,25 +307,23 @@ static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp
rsp.handle = pRsp->handle;
rsp.refId = pRsp->refId;
rpcSendResponse(&rsp);
} else {
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP);
}
}
static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
if (!InChildProc(pWrapper->proc.ptype)) {
rpcRegisterBrokenLinkArg(pMsg);
} else {
if (InChildProc(pWrapper->proc.ptype)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST);
} else {
rpcRegisterBrokenLinkArg(pMsg);
}
}
static inline void dmReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) {
if (!InChildProc(pWrapper->proc.ptype)) {
rpcReleaseHandle(handle, type);
} else {
if (InChildProc(pWrapper->proc.ptype)) {
SRpcMsg msg = {.handle = handle, .code = type};
dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE);
} else {
rpcReleaseHandle(handle, type);
}
}