refactor: enable multi-process mode
This commit is contained in:
parent
ddc692b9f8
commit
47dde96404
|
@ -45,7 +45,7 @@ extern bool tsPrintAuth;
|
||||||
extern int64_t tsTickPerMin[3];
|
extern int64_t tsTickPerMin[3];
|
||||||
|
|
||||||
// multi-process
|
// multi-process
|
||||||
extern bool tsMultiProcess;
|
extern int32_t tsMultiProcess;
|
||||||
extern int32_t tsMnodeShmSize;
|
extern int32_t tsMnodeShmSize;
|
||||||
extern int32_t tsVnodeShmSize;
|
extern int32_t tsVnodeShmSize;
|
||||||
extern int32_t tsQnodeShmSize;
|
extern int32_t tsQnodeShmSize;
|
||||||
|
|
|
@ -38,7 +38,7 @@ bool tsEnableSlaveQuery = true;
|
||||||
bool tsPrintAuth = false;
|
bool tsPrintAuth = false;
|
||||||
|
|
||||||
// multi process
|
// multi process
|
||||||
bool tsMultiProcess = false;
|
int32_t tsMultiProcess = 0;
|
||||||
int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2 + 128;
|
int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2 + 128;
|
||||||
int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 128;
|
int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 128;
|
||||||
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
|
||||||
|
@ -370,7 +370,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1;
|
if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
|
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
|
||||||
|
|
||||||
if (cfgAddBool(pCfg, "multiProcess", tsMultiProcess, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
|
||||||
|
@ -552,7 +552,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval;
|
tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval;
|
||||||
tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval;
|
tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval;
|
||||||
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
|
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
|
||||||
tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->bval;
|
tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->i32;
|
||||||
|
|
||||||
tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval;
|
tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval;
|
||||||
tsMnodeShmSize = cfgGetItem(pCfg, "mnodeShmSize")->i32;
|
tsMnodeShmSize = cfgGetItem(pCfg, "mnodeShmSize")->i32;
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "dmMgmt.h"
|
#include "dmMgmt.h"
|
||||||
|
|
||||||
static int32_t dmInitParentProc(SMgmtWrapper *pWrapper) {
|
static int32_t dmCreateShm(SMgmtWrapper *pWrapper) {
|
||||||
int32_t shmsize = tsMnodeShmSize;
|
int32_t shmsize = tsMnodeShmSize;
|
||||||
if (pWrapper->ntype == VNODE) {
|
if (pWrapper->ntype == VNODE) {
|
||||||
shmsize = tsVnodeShmSize;
|
shmsize = tsVnodeShmSize;
|
||||||
|
@ -38,16 +38,10 @@ static int32_t dmInitParentProc(SMgmtWrapper *pWrapper) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->proc.shm.id, shmsize);
|
dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->proc.shm.id, shmsize);
|
||||||
|
|
||||||
if (dmInitProc(pWrapper) != 0) {
|
|
||||||
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dmNewNodeProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) {
|
static int32_t dmNewProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) {
|
||||||
char tstr[8] = {0};
|
char tstr[8] = {0};
|
||||||
char *args[6] = {0};
|
char *args[6] = {0};
|
||||||
snprintf(tstr, sizeof(tstr), "%d", ntype);
|
snprintf(tstr, sizeof(tstr), "%d", ntype);
|
||||||
|
@ -69,21 +63,6 @@ static int32_t dmNewNodeProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dmRunParentProc(SMgmtWrapper *pWrapper) {
|
|
||||||
if (pWrapper->pDnode->rtype == NODE_END) {
|
|
||||||
dInfo("node:%s, should be started manually in child process", pWrapper->name);
|
|
||||||
} else {
|
|
||||||
if (dmNewNodeProc(pWrapper, pWrapper->ntype) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (dmRunProc(&pWrapper->proc) != 0) {
|
|
||||||
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
if (taosMkDir(pWrapper->path) != 0) {
|
if (taosMkDir(pWrapper->path) != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
@ -108,6 +87,35 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
pWrapper->deployed = true;
|
pWrapper->deployed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (InParentProc(pWrapper->proc.ptype)) {
|
||||||
|
dDebug("node:%s, start to open", pWrapper->name);
|
||||||
|
if (dmCreateShm(pWrapper) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (dmWriteShmFile(pWrapper->path, pWrapper->name, &pWrapper->proc.shm) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!OnlyInTestProc(pWrapper->proc.ptype)) {
|
||||||
|
if (dmInitProc(pWrapper) != 0) {
|
||||||
|
dError("node:%s, failed to init proc since %s", pWrapper->name, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (pWrapper->pDnode->rtype == NODE_END) {
|
||||||
|
dInfo("node:%s, should be started manually in child process", pWrapper->name);
|
||||||
|
} else {
|
||||||
|
if (dmNewProc(pWrapper, pWrapper->ntype) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (dmRunProc(&pWrapper->proc) != 0) {
|
||||||
|
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dDebug("node:%s, has been opened in parent process", pWrapper->name);
|
||||||
|
}
|
||||||
|
|
||||||
if (InChildProc(pWrapper->proc.ptype)) {
|
if (InChildProc(pWrapper->proc.ptype)) {
|
||||||
dDebug("node:%s, start to open", pWrapper->name);
|
dDebug("node:%s, start to open", pWrapper->name);
|
||||||
if ((*pWrapper->func.openFp)(&input, &output) != 0) {
|
if ((*pWrapper->func.openFp)(&input, &output) != 0) {
|
||||||
|
@ -124,20 +132,6 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
pWrapper->deployed = true;
|
pWrapper->deployed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (InParentProc(pWrapper->proc.ptype)) {
|
|
||||||
dDebug("node:%s, start to open", pWrapper->name);
|
|
||||||
if (dmInitParentProc(pWrapper) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (dmWriteShmFile(pWrapper->path, pWrapper->name, &pWrapper->proc.shm) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (dmRunParentProc(pWrapper) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
dDebug("node:%s, has been opened in parent process", pWrapper->name);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (output.pMgmt != NULL) {
|
if (output.pMgmt != NULL) {
|
||||||
pWrapper->pMgmt = output.pMgmt;
|
pWrapper->pMgmt = output.pMgmt;
|
||||||
}
|
}
|
||||||
|
@ -246,7 +240,7 @@ static void dmCloseNodes(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dmWatchNodes(SDnode *pDnode) {
|
static void dmWatchNodes(SDnode *pDnode) {
|
||||||
if (!InParentProc(pDnode->ptype)) return;
|
if (!OnlyInParentProc(pDnode->ptype)) return;
|
||||||
if (pDnode->rtype == NODE_END) return;
|
if (pDnode->rtype == NODE_END) return;
|
||||||
|
|
||||||
taosThreadMutexLock(&pDnode->mutex);
|
taosThreadMutexLock(&pDnode->mutex);
|
||||||
|
@ -255,12 +249,12 @@ static void dmWatchNodes(SDnode *pDnode) {
|
||||||
SProc *proc = &pWrapper->proc;
|
SProc *proc = &pWrapper->proc;
|
||||||
|
|
||||||
if (!pWrapper->required) continue;
|
if (!pWrapper->required) continue;
|
||||||
if (!InParentProc(proc->ptype)) continue;
|
if (!OnlyInParentProc(proc->ptype)) continue;
|
||||||
|
|
||||||
if (proc->pid <= 0 || !taosProcExist(proc->pid)) {
|
if (proc->pid <= 0 || !taosProcExist(proc->pid)) {
|
||||||
dWarn("node:%s, process:%d is killed and needs to restart", pWrapper->name, proc->pid);
|
dWarn("node:%s, process:%d is killed and needs to restart", pWrapper->name, proc->pid);
|
||||||
dmCloseProcRpcHandles(&pWrapper->proc);
|
dmCloseProcRpcHandles(&pWrapper->proc);
|
||||||
dmNewNodeProc(pWrapper, ntype);
|
dmNewProc(pWrapper, ntype);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pDnode->mutex);
|
taosThreadMutexUnlock(&pDnode->mutex);
|
||||||
|
|
|
@ -67,7 +67,7 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (InParentProc(proc->ptype) && !InChildProc(proc->ptype)) {
|
if (InParentProc(proc->ptype)) {
|
||||||
if (dmInitProcMutex(queue) != 0) {
|
if (dmInitProcMutex(queue) != 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -185,8 +185,8 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
|
||||||
taosThreadMutexUnlock(&queue->mutex);
|
taosThreadMutexUnlock(&queue->mutex);
|
||||||
tsem_post(&queue->sem);
|
tsem_post(&queue->sem);
|
||||||
|
|
||||||
dTrace("node:%s, push msg:%p %d cont:%p %d handle:%p, ftype:%s pos:%d remain:%d", queue->name, pHead, headLen, pBody,
|
dTrace("node:%s, push %s msg:%p %d cont:%p %d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen,
|
||||||
bodyLen, (void *)handle, dmFuncStr(ftype), pos, queue->items);
|
pBody, bodyLen, pos, queue->items);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -269,13 +269,15 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
|
||||||
*pBodyLen = rawBodyLen;
|
*pBodyLen = rawBodyLen;
|
||||||
*pFuncType = (EProcFuncType)ftype;
|
*pFuncType = (EProcFuncType)ftype;
|
||||||
|
|
||||||
dTrace("node:%s, pop msg:%p %d body:%p %d, ftype:%s pos:%d remain:%d", queue->name, pHead, rawHeadLen, pBody,
|
dTrace("node:%s, pop %s msg:%p %d body:%p %d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, rawHeadLen,
|
||||||
rawBodyLen, dmFuncStr(ftype), pos, queue->items);
|
pBody, rawBodyLen, pos, queue->items);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dmInitProc(struct SMgmtWrapper *pWrapper) {
|
int32_t dmInitProc(struct SMgmtWrapper *pWrapper) {
|
||||||
SProc *proc = &pWrapper->proc;
|
SProc *proc = &pWrapper->proc;
|
||||||
|
if (proc->name != NULL) return 0;
|
||||||
|
|
||||||
proc->wrapper = pWrapper;
|
proc->wrapper = pWrapper;
|
||||||
proc->name = pWrapper->name;
|
proc->name = pWrapper->name;
|
||||||
|
|
||||||
|
@ -319,7 +321,7 @@ static void *dmConsumChildQueue(void *param) {
|
||||||
do {
|
do {
|
||||||
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
|
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
|
||||||
if (numOfMsgs == 0) {
|
if (numOfMsgs == 0) {
|
||||||
dDebug("node:%s, get no msg from cueue and exit thread", proc->name);
|
dDebug("node:%s, get no msg from cqueue and exit thread", proc->name);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -330,11 +332,10 @@ static void *dmConsumChildQueue(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ftype != DND_FUNC_REQ) {
|
if (ftype != DND_FUNC_REQ) {
|
||||||
dFatal("node:%s, msg:%p from cqueue, invalid ftype:%d", proc->name, pHead, ftype);
|
dFatal("node:%s, get msg:%p from cqueue, invalid ftype:%d", proc->name, pHead, ftype);
|
||||||
taosFreeQitem(pHead);
|
taosFreeQitem(pHead);
|
||||||
rpcFreeCont(pBody);
|
rpcFreeCont(pBody);
|
||||||
} else {
|
} else {
|
||||||
dTrace("node:%s, msg:%p from cueue", proc->name, pHead);
|
|
||||||
pReq = pHead;
|
pReq = pHead;
|
||||||
pReq->rpcMsg.pCont = pBody;
|
pReq->rpcMsg.pCont = pBody;
|
||||||
code = dmProcessNodeMsg(pWrapper, pReq);
|
code = dmProcessNodeMsg(pWrapper, pReq);
|
||||||
|
@ -388,22 +389,22 @@ static void *dmConsumParentQueue(void *param) {
|
||||||
if (ftype == DND_FUNC_RSP) {
|
if (ftype == DND_FUNC_RSP) {
|
||||||
pRsp = pHead;
|
pRsp = pHead;
|
||||||
pRsp->pCont = pBody;
|
pRsp->pCont = pBody;
|
||||||
dTrace("node:%s, rsp msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
|
dTrace("node:%s, get rsp msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
|
||||||
dmRemoveProcRpcHandle(proc, pRsp->handle);
|
dmRemoveProcRpcHandle(proc, pRsp->handle);
|
||||||
rpcSendResponse(pRsp);
|
rpcSendResponse(pRsp);
|
||||||
} else if (ftype == DND_FUNC_REGIST) {
|
} else if (ftype == DND_FUNC_REGIST) {
|
||||||
pRsp = pHead;
|
pRsp = pHead;
|
||||||
dTrace("node:%s, regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
|
dTrace("node:%s, get regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
|
||||||
rpcRegisterBrokenLinkArg(pRsp);
|
rpcRegisterBrokenLinkArg(pRsp);
|
||||||
rpcFreeCont(pBody);
|
rpcFreeCont(pBody);
|
||||||
} else if (ftype == DND_FUNC_RELEASE) {
|
} else if (ftype == DND_FUNC_RELEASE) {
|
||||||
pRsp = pHead;
|
pRsp = pHead;
|
||||||
dTrace("node:%s, release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
|
dTrace("node:%s, get release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
|
||||||
dmRemoveProcRpcHandle(proc, pRsp->handle);
|
dmRemoveProcRpcHandle(proc, pRsp->handle);
|
||||||
rpcReleaseHandle(pRsp->handle, (int8_t)pRsp->code);
|
rpcReleaseHandle(pRsp->handle, (int8_t)pRsp->code);
|
||||||
rpcFreeCont(pBody);
|
rpcFreeCont(pBody);
|
||||||
} else {
|
} else {
|
||||||
dFatal("node:%s, msg:%p get from pqueue, invalid ftype:%d", proc->name, pHead, ftype);
|
dFatal("node:%s, get msg:%p from pqueue, invalid ftype:%d", proc->name, pHead, ftype);
|
||||||
rpcFreeCont(pBody);
|
rpcFreeCont(pBody);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -441,16 +442,17 @@ int32_t dmRunProc(SProc *proc) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmStopProc(SProc *proc) {
|
void dmStopProc(SProc *proc) {
|
||||||
|
proc->stop = true;
|
||||||
if (taosCheckPthreadValid(proc->pthread)) {
|
if (taosCheckPthreadValid(proc->pthread)) {
|
||||||
dDebug("node:%s, start to join pthread:%" PRId64, proc->name, proc->pthread);
|
dDebug("node:%s, start to join pthread:%" PRId64, proc->name, proc->pthread);
|
||||||
tsem_post(&proc->cqueue->sem);
|
tsem_post(&proc->pqueue->sem);
|
||||||
taosThreadJoin(proc->pthread, NULL);
|
taosThreadJoin(proc->pthread, NULL);
|
||||||
taosThreadClear(&proc->pthread);
|
taosThreadClear(&proc->pthread);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosCheckPthreadValid(proc->cthread)) {
|
if (taosCheckPthreadValid(proc->cthread)) {
|
||||||
dDebug("node:%s, start to join cthread:%" PRId64, proc->name, proc->cthread);
|
dDebug("node:%s, start to join cthread:%" PRId64, proc->name, proc->cthread);
|
||||||
tsem_post(&proc->pqueue->sem);
|
tsem_post(&proc->cqueue->sem);
|
||||||
taosThreadJoin(proc->cthread, NULL);
|
taosThreadJoin(proc->cthread, NULL);
|
||||||
taosThreadClear(&proc->cthread);
|
taosThreadClear(&proc->cthread);
|
||||||
}
|
}
|
||||||
|
@ -458,6 +460,7 @@ void dmStopProc(SProc *proc) {
|
||||||
|
|
||||||
void dmCleanupProc(struct SMgmtWrapper *pWrapper) {
|
void dmCleanupProc(struct SMgmtWrapper *pWrapper) {
|
||||||
SProc *proc = &pWrapper->proc;
|
SProc *proc = &pWrapper->proc;
|
||||||
|
if (proc->name == NULL) return;
|
||||||
|
|
||||||
dDebug("node:%s, start to clean up proc", pWrapper->name);
|
dDebug("node:%s, start to clean up proc", pWrapper->name);
|
||||||
dmStopProc(proc);
|
dmStopProc(proc);
|
||||||
|
|
|
@ -77,6 +77,9 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
|
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
|
||||||
SMgmtWrapper *pWrapper = NULL;
|
SMgmtWrapper *pWrapper = NULL;
|
||||||
|
|
||||||
|
dTrace("msg:%s is received, handle:%p app:%p cont:%p len:%d code:0x%04x refId:%" PRId64, TMSG_INFO(msgType),
|
||||||
|
pRpc->handle, pRpc->ahandle, pRpc->pCont, pRpc->contLen, pRpc->code, pRpc->refId);
|
||||||
|
|
||||||
if (msgType == TDMT_DND_NET_TEST) {
|
if (msgType == TDMT_DND_NET_TEST) {
|
||||||
dmProcessNetTestReq(pDnode, pRpc);
|
dmProcessNetTestReq(pDnode, pRpc);
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -110,18 +113,24 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
} else {
|
} else {
|
||||||
pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
|
pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
|
||||||
if (pHandle->needCheckVgId) {
|
if (pHandle->needCheckVgId) {
|
||||||
SMsgHead *pHead = pRpc->pCont;
|
if (pRpc->contLen > 0) {
|
||||||
int32_t vgId = ntohl(pHead->vgId);
|
SMsgHead *pHead = pRpc->pCont;
|
||||||
if (vgId == QNODE_HANDLE) {
|
int32_t vgId = ntohl(pHead->vgId);
|
||||||
pWrapper = &pDnode->wrappers[QNODE];
|
if (vgId == QNODE_HANDLE) {
|
||||||
} else if (vgId == MNODE_HANDLE) {
|
pWrapper = &pDnode->wrappers[QNODE];
|
||||||
pWrapper = &pDnode->wrappers[MNODE];
|
} else if (vgId == MNODE_HANDLE) {
|
||||||
|
pWrapper = &pDnode->wrappers[MNODE];
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
||||||
|
goto _OVER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("msg:%s is received, handle:%p app:%p", TMSG_INFO(msgType), pRpc->handle, pRpc->ahandle);
|
|
||||||
if (dmMarkWrapper(pWrapper) != 0) {
|
if (dmMarkWrapper(pWrapper) != 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
} else {
|
} else {
|
||||||
|
@ -138,7 +147,6 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (InParentProc(pWrapper->proc.ptype)) {
|
if (InParentProc(pWrapper->proc.ptype)) {
|
||||||
dTrace("msg:%p, put into cqueue, handle:%p refId:%" PRId64, pMsg, pRpc->handle, pRpc->refId);
|
|
||||||
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen,
|
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen,
|
||||||
(isReq && (pRpc->code == 0)) ? pRpc->handle : NULL, pRpc->refId, DND_FUNC_REQ);
|
(isReq && (pRpc->code == 0)) ? pRpc->handle : NULL, pRpc->refId, DND_FUNC_REQ);
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue