This commit is contained in:
Shengliang Guan 2022-03-15 17:00:16 +08:00
parent 06dbe69f87
commit 44cd8c00bb
25 changed files with 599 additions and 727 deletions

View File

@ -51,6 +51,7 @@ typedef struct {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
int32_t rspLen; int32_t rspLen;
void *pRsp; void *pRsp;
void *pNode;
} SNodeMsg; } SNodeMsg;
typedef struct SRpcInit { typedef struct SRpcInit {

View File

@ -56,7 +56,7 @@ extern "C" {
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
typedef enum { MNODE, VNODES, QNODE, SNODE, BNODE, DNODE, NODE_MAX } ENodeType; typedef enum { VNODES, QNODE, SNODE, MNODE, BNODE, DNODE, NODE_MAX } ENodeType;
typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType; typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType;
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus; typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType; typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType;
@ -77,29 +77,6 @@ typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper);
typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper); typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper);
typedef bool (*RequireNodeFp)(SMgmtWrapper *pWrapper); typedef bool (*RequireNodeFp)(SMgmtWrapper *pWrapper);
typedef struct SMsgHandle {
NodeMsgFp msgFp;
SMgmtWrapper *pWrapper;
} SMsgHandle;
typedef struct SMgmtFp {
OpenNodeFp openFp;
CloseNodeFp closeFp;
RequireNodeFp requiredFp;
} SMgmtFp;
typedef struct SMgmtWrapper {
const char *name;
char *path;
bool required;
EProcType procType;
SProcObj *pProc;
void *pMgmt;
SDnode *pDnode;
NodeMsgFp msgFps[TDMT_MAX];
SMgmtFp fp;
} SMgmtWrapper;
typedef struct { typedef struct {
EWorkerType type; EWorkerType type;
const char *name; const char *name;
@ -114,6 +91,29 @@ typedef struct {
}; };
} SDnodeWorker; } SDnodeWorker;
typedef struct SMsgHandle {
NodeMsgFp msgFp;
SMgmtWrapper *pWrapper;
} SMsgHandle;
typedef struct SMgmtFp {
OpenNodeFp openFp;
CloseNodeFp closeFp;
RequireNodeFp requiredFp;
} SMgmtFp;
typedef struct SMgmtWrapper {
const char *name;
char *path;
bool required;
EProcType procType;
SProcObj *pProc;
void *pMgmt;
SDnode *pDnode;
NodeMsgFp msgFps[TDMT_MAX];
SMgmtFp fp;
} SMgmtWrapper;
typedef struct { typedef struct {
void *serverRpc; void *serverRpc;
void *clientRpc; void *clientRpc;
@ -143,6 +143,7 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup);
TdFilePtr dndCheckRunning(char *dataDir); TdFilePtr dndCheckRunning(char *dataDir);
SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType); SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp); void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
// dndMonitor.h // dndMonitor.h
void dndSendMonitorReport(SDnode *pDnode); void dndSendMonitorReport(SDnode *pDnode);
@ -153,6 +154,7 @@ void dndClose(SDnode *pDnode);
int32_t dndRun(SDnode *pDnode); int32_t dndRun(SDnode *pDnode);
void dndeHandleEvent(SDnode *pDnode, EDndEvent event); void dndeHandleEvent(SDnode *pDnode, EDndEvent event);
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
// dndTransport.h // dndTransport.h
int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg);

View File

@ -27,6 +27,7 @@ void dndClose(SDnode *pDnode);
int32_t dndRun(SDnode *pDnode); int32_t dndRun(SDnode *pDnode);
void dndeHandleEvent(SDnode *pDnode, EDndEvent event); void dndeHandleEvent(SDnode *pDnode, EDndEvent event);
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -126,3 +126,15 @@ TdFilePtr dndCheckRunning(char *dataDir) {
dDebug("file:%s is locked", filepath); dDebug("file:%s is locked", filepath);
return pFile; return pFile;
} }
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is received");
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
dndGetStartup(pDnode, pStartup);
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)};
rpcSendResponse(&rpcRsp);
}

View File

@ -66,7 +66,7 @@ void dndSendMonitorReport(SDnode *pDnode) {
SMonClusterInfo clusterInfo = {0}; SMonClusterInfo clusterInfo = {0};
SMonVgroupInfo vgroupInfo = {0}; SMonVgroupInfo vgroupInfo = {0};
SMonGrantInfo grantInfo = {0}; SMonGrantInfo grantInfo = {0};
if (mmGetMonitorInfo(pDnode, &clusterInfo, &vgroupInfo, &grantInfo) == 0) { if (mmGetMonitorInfo(dndGetWrapper(pDnode, MNODE), &clusterInfo, &vgroupInfo, &grantInfo) == 0) {
monSetClusterInfo(pMonitor, &clusterInfo); monSetClusterInfo(pMonitor, &clusterInfo);
monSetVgroupInfo(pMonitor, &vgroupInfo); monSetVgroupInfo(pMonitor, &vgroupInfo);
monSetGrantInfo(pMonitor, &grantInfo); monSetGrantInfo(pMonitor, &grantInfo);

View File

@ -49,6 +49,10 @@ static void dndCloseNode(SMgmtWrapper *pWrapper) {
if (pWrapper->required) { if (pWrapper->required) {
(*pWrapper->fp.closeFp)(pWrapper); (*pWrapper->fp.closeFp)(pWrapper);
} }
if (pWrapper->pProc) {
taosProcCleanup(pWrapper->pProc);
pWrapper->pProc = NULL;
}
} }
static void dndClearMemory(SDnode *pDnode) { static void dndClearMemory(SDnode *pDnode) {
@ -171,6 +175,7 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
dInfo("node:%s, will start in single process", pWrapper->name); dInfo("node:%s, will start in single process", pWrapper->name);
pWrapper->procType = PROC_SINGLE;
if (dndOpenNode(pWrapper) != 0) { if (dndOpenNode(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1; return -1;
@ -189,14 +194,67 @@ static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) {
} }
} }
static void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) {
dmSendRedirectRsp(pWrapper->pDnode, pRsp);
} else {
rpcSendResponse(pRsp);
}
}
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
int32_t code = -1;
if (pWrapper->procType != PROC_CHILD) {
dndSendRpcRsp(pWrapper, pRsp);
} else {
do {
code = taosProcPutToParentQueue(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen);
if (code != 0) {
taosMsleep(10);
}
} while (code != 0);
}
}
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
dTrace("msg:%p, get from child queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg;
pRpc->pCont = pCont;
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
int32_t code = (*msgFp)(pWrapper, pMsg);
if (code != 0) {
bool isReq = (pRpc->msgType & 1U);
if (isReq) {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
dndSendRsp(pWrapper, &rsp);
}
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pCont);
}
}
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRsp, int32_t msgLen, void *pCont, int32_t contLen) {
dTrace("msg:%p, get from parent queue", pRsp);
pRsp->pCont = pCont;
dndSendRpcRsp(pWrapper, pRsp);
free(pRsp);
}
static int32_t dndRunInMultiProcess(SDnode *pDnode) { static int32_t dndRunInMultiProcess(SDnode *pDnode) {
dInfo("dnode run in multi process mode");
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
if (n == DNODE) { if (n == DNODE) {
dInfo("node:%s, will start in parent process", pWrapper->name); dInfo("node:%s, will start in parent process", pWrapper->name);
pWrapper->procType = PROC_PARENT; pWrapper->procType = PROC_SINGLE;
if (dndOpenNode(pWrapper) != 0) { if (dndOpenNode(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1; return -1;
@ -204,7 +262,21 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
continue; continue;
} }
SProcCfg cfg = {0}; SProcCfg cfg = {.childQueueSize = 1024 * 1024 * 2, // size will be a configuration item
.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.parentQueueSize = 1024 * 1024 * 2, // size will be a configuration item
.parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
.parentdMallocHeadFp = (ProcMallocFp)malloc,
.parentFreeHeadFp = (ProcFreeFp)free,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.testFlag = 0,
.pParent = pWrapper,
.name = pWrapper->name};
SProcObj *pProc = taosProcInit(&cfg); SProcObj *pProc = taosProcInit(&cfg);
if (pProc == NULL) { if (pProc == NULL) {
dError("node:%s, failed to fork since %s", pWrapper->name, terrstr()); dError("node:%s, failed to fork since %s", pWrapper->name, terrstr());
@ -227,6 +299,11 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
dInfo("node:%s, will not start in parent process", pWrapper->name); dInfo("node:%s, will not start in parent process", pWrapper->name);
pWrapper->procType = PROC_PARENT; pWrapper->procType = PROC_PARENT;
} }
if (taosProcRun(pProc) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
return -1;
}
} }
return 0; return 0;
@ -275,14 +352,6 @@ static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) {
return 0; return 0;
} }
static void dndSendRpcRsp(SDnode *pDnode, SRpcMsg *pRpc) {
if (pRpc->code == TSDB_CODE_APP_NOT_READY) {
dmSendRedirectRsp(pDnode, pRpc);
} else {
rpcSendResponse(pRpc);
}
}
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) { if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
dmUpdateMnodeEpSet(pWrapper->pDnode, pEpSet); dmUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
@ -307,19 +376,33 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
} }
dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user); dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user);
code = (*msgFp)(pWrapper, pMsg);
if (pWrapper->procType == PROC_SINGLE) {
code = (*msgFp)(pWrapper, pMsg);
} else if (pWrapper->procType == PROC_PARENT) {
code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen);
} else {
terrno = TSDB_CODE_MEMORY_CORRUPTED;
dError("msg:%p, won't be processed for it is child process", pMsg);
}
_OVER: _OVER:
if (code != 0) { if (code == 0) {
if (pWrapper->procType == PROC_PARENT) {
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
}
} else {
dError("msg:%p, failed to process since %s", pMsg, terrstr()); dError("msg:%p, failed to process since %s", pMsg, terrstr());
bool isReq = (pRpc->msgType & 1U); bool isReq = (pRpc->msgType & 1U);
if (isReq) { if (isReq) {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
dndSendRpcRsp(pWrapper->pDnode, &rsp); dndSendRsp(pWrapper, &rsp);
} }
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
} }
} }

View File

@ -25,8 +25,7 @@
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SDnode *pDnode = parent; SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->trans; STransMgmt *pMgmt = &pDnode->trans;
tmsg_t msgType = pRsp->msgType;
tmsg_t msgType = pRsp->msgType;
if (dndGetStatus(pDnode) == DND_STAT_STOPPED) { if (dndGetStatus(pDnode) == DND_STAT_STOPPED) {
if (pRsp == NULL || pRsp->pCont == NULL) return; if (pRsp == NULL || pRsp->pCont == NULL) return;
@ -88,11 +87,11 @@ void dndCleanupClient(SDnode *pDnode) {
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
SDnode *pDnode = param; SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->trans; STransMgmt *pMgmt = &pDnode->trans;
tmsg_t msgType = pReq->msgType;
tmsg_t msgType = pReq->msgType;
if (msgType == TDMT_DND_NETWORK_TEST) { if (msgType == TDMT_DND_NETWORK_TEST) {
dTrace("RPC %p, network test req will be processed, app:%p", pReq->handle, pReq->ahandle); dTrace("RPC %p, network test req will be processed, app:%p", pReq->handle, pReq->ahandle);
dmProcessStartupReq(pDnode, pReq); dndProcessStartupReq(pDnode, pReq);
return; return;
} }
@ -249,8 +248,8 @@ void dndCleanupServer(SDnode *pDnode) {
int32_t dndInitMsgHandle(SDnode *pDnode) { int32_t dndInitMsgHandle(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->trans; STransMgmt *pMgmt = &pDnode->trans;
for (ENodeType nodeType = 0; nodeType < NODE_MAX; ++nodeType) { for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[nodeType]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) { for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
NodeMsgFp msgFp = pWrapper->msgFps[msgIndex]; NodeMsgFp msgFp = pWrapper->msgFps[msgIndex];

View File

@ -51,9 +51,6 @@ void dmUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg); void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
// dmHandle.h
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -24,7 +24,6 @@ extern "C" {
void dmInitMsgHandles(SMgmtWrapper *pWrapper); void dmInitMsgHandles(SMgmtWrapper *pWrapper);
void dmSendStatusReq(SDnodeMgmt *pMgmt); void dmSendStatusReq(SDnodeMgmt *pMgmt);
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
int32_t dmProcessConfigReq(SDnode *pDnode, SRpcMsg *pReq); int32_t dmProcessConfigReq(SDnode *pDnode, SRpcMsg *pReq);
void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp); void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp);
void dmProcessAuthRsp(SDnode *pDnode, SRpcMsg *pRsp); void dmProcessAuthRsp(SDnode *pDnode, SRpcMsg *pRsp);

View File

@ -22,8 +22,8 @@
extern "C" { extern "C" {
#endif #endif
int32_t dmStartWorker(); int32_t dmStartWorker(SDnodeMgmt *pMgmt);
void dmStopWorker(); void dmStopWorker(SDnodeMgmt *pMgmt);
int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -151,8 +151,8 @@ PRASE_DNODE_OVER:
dmResetDnodes(pMgmt, pMgmt->pDnodeEps); dmResetDnodes(pMgmt, pMgmt->pDnodeEps);
terrno = 0; terrno = code;
return 0; return code;
} }
int32_t dmWriteFile(SDnodeMgmt *pMgmt) { int32_t dmWriteFile(SDnodeMgmt *pMgmt) {

View File

@ -108,6 +108,7 @@ void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) {
int32_t dmInit(SMgmtWrapper *pWrapper) { int32_t dmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
SDnodeMgmt *pMgmt = calloc(1, sizeof(SDnodeMgmt)); SDnodeMgmt *pMgmt = calloc(1, sizeof(SDnodeMgmt));
dInfo("dnode-mgmt is initialized");
pMgmt->dnodeId = 0; pMgmt->dnodeId = 0;
pMgmt->dropped = 0; pMgmt->dropped = 0;
@ -120,23 +121,23 @@ int32_t dmInit(SMgmtWrapper *pWrapper) {
pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMgmt->dnodeHash == NULL) { if (pMgmt->dnodeHash == NULL) {
dError("node:%s, failed to init dnode hash", pWrapper->name); dError("failed to init dnode hash");
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
if (dmReadFile(pMgmt) != 0) { if (dmReadFile(pMgmt) != 0) {
dError("node:%s, failed to read file since %s", pWrapper->name, terrstr()); dError("failed to read file since %s", terrstr());
return -1; return -1;
} }
if (pMgmt->dropped) { if (pMgmt->dropped) {
dError("node:%s, will not start since its already dropped", pWrapper->name); dError("dnode will not start since its already dropped");
return -1; return -1;
} }
if (dmStartWorker(pMgmt) != 0) { if (dmStartWorker(pMgmt) != 0) {
dError("node:%s, failed to start worker since %s", pWrapper->name, terrstr()); dError("failed to start dnode worker since %s", terrstr());
return -1; return -1;
} }
@ -153,6 +154,7 @@ void dmCleanup(SMgmtWrapper *pWrapper) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt; SDnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return; if (pMgmt == NULL) return;
dInfo("dnode-mgmt start to clean up");
dmStopWorker(pMgmt); dmStopWorker(pMgmt);
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
@ -168,6 +170,9 @@ void dmCleanup(SMgmtWrapper *pWrapper) {
} }
taosWUnLockLatch(&pMgmt->latch); taosWUnLockLatch(&pMgmt->latch);
free(pMgmt);
pWrapper->pMgmt = NULL;
dInfo("dnode-mgmt is cleaned up"); dInfo("dnode-mgmt is cleaned up");
} }

View File

@ -102,18 +102,6 @@ int32_t dmProcessConfigReq(SDnode *pDnode, SRpcMsg *pReq) {
return TSDB_CODE_OPS_NOT_SUPPORT; return TSDB_CODE_OPS_NOT_SUPPORT;
} }
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is received");
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
dndGetStartup(pDnode, pStartup);
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)};
rpcSendResponse(&rpcRsp);
}
void dmInitMsgHandles(SMgmtWrapper *pWrapper) { void dmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by DNODE // Requests handled by DNODE
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg);

View File

@ -57,7 +57,7 @@ static void *dmThreadRoutine(void *param) {
static void dmProcessMgmtQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) { static void dmProcessMgmtQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) {
int32_t code = 0; int32_t code = 0;
SRpcMsg *pMsg = &pNodeMsg->rpcMsg; SRpcMsg *pMsg = &pNodeMsg->rpcMsg;
dTrace("msg:%p, will be processed in mgmt queue", pNodeMsg); dTrace("msg:%p, will be processed", pNodeMsg);
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_DND_CREATE_MNODE: case TDMT_DND_CREATE_MNODE:

View File

@ -22,8 +22,8 @@
extern "C" { extern "C" {
#endif #endif
int32_t mmReadFile(SDnode *pDnode); int32_t mmReadFile(SMnodeMgmt *pMgmt);
int32_t mmWriteFile(SDnode *pDnode); int32_t mmWriteFile(SMnodeMgmt *pMgmt);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -30,31 +30,35 @@ typedef struct SMnodeMgmt {
int8_t selfIndex; int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
SMnode *pMnode; SMnode *pMnode;
SDnode *pDnode;
SProcObj *pProcess; SProcObj *pProcess;
bool singleProc; bool singleProc;
SRWLatch latch; SRWLatch latch;
const char *path;
SDnodeWorker readWorker; SDnodeWorker readWorker;
SDnodeWorker writeWorker; SDnodeWorker writeWorker;
SDnodeWorker syncWorker; SDnodeWorker syncWorker;
} SMnodeMgmt; } SMnodeMgmt;
// mmInt.h // interface
void mmGetMgmtFp(SMgmtWrapper *pMgmt); void mmGetMgmtFp(SMgmtWrapper *pMgmt);
int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo);
// mmMgmt.h // mmInt.h
int32_t mmInit(SDnode *pDnode); SMnode *mmAcquire(SMnodeMgmt *pMgmt);
void mmCleanup(SDnode *pDnode); void mmRelease(SMnodeMgmt *pMgmt, SMnode *pMnode);
int32_t mmOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption);
int32_t mmAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption);
int32_t mmDrop(SMnodeMgmt *pMgmt);
int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate);
// mmHandle.h // mmHandle.h
int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -1,36 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_MNODE_MGMT_H_
#define _TD_DND_MNODE_MGMT_H_
#include "mmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
SMnode *mmAcquire(SDnode *pDnode);
void mmRelease(SDnode *pDnode, SMnode *pMnode);
int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption);
int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption);
int32_t mmDrop(SDnode *pDnode);
int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_MNODE_MGMT_H_*/

View File

@ -27,11 +27,6 @@ int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -22,19 +22,15 @@
extern "C" { extern "C" {
#endif #endif
int32_t mmStartWorker(SDnode *pDnode); int32_t mmStartWorker(SMnodeMgmt *pMgmt);
void mmStopWorker(SDnode *pDnode); void mmStopWorker(SMnodeMgmt *pMgmt);
void mmInitMsgFp(SMnodeMgmt *pMgmt);
void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -14,22 +14,19 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mmInt.h" #include "mmFile.h"
#if 0 int32_t mmReadFile(SMnodeMgmt *pMgmt) {
int32_t mmReadFile(SDnode *pDnode) { int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR;
SMnodeMgmt *pMgmt = &pDnode->mmgmt; int32_t len = 0;
int32_t maxLen = 4096;
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
char file[PATH_MAX];
TdFilePtr pFile = NULL;
int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR; snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
int32_t len = 0; pFile = taosOpenFile(file, TD_FILE_READ);
int32_t maxLen = 4096;
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
char file[PATH_MAX + 20];
snprintf(file, sizeof(file), "%s%smnode.json", pDnode->dir.dnode, TD_DIRSEP);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) { if (pFile == NULL) {
dDebug("file %s not exist", file); dDebug("file %s not exist", file);
code = 0; code = 0;
@ -115,11 +112,9 @@ PRASE_MNODE_OVER:
return code; return code;
} }
int32_t mmWriteFile(SDnode *pDnode) { int32_t mmWriteFile(SMnodeMgmt *pMgmt) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
char file[PATH_MAX]; char file[PATH_MAX];
snprintf(file, sizeof(file), "%s%smnode.json.bak", pDnode->dir.dnode, TD_DIRSEP); snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
@ -156,7 +151,7 @@ int32_t mmWriteFile(SDnode *pDnode) {
free(content); free(content);
char realfile[PATH_MAX + 20]; char realfile[PATH_MAX + 20];
snprintf(realfile, sizeof(realfile), "%s%smnode.json", pDnode->dir.dnode, TD_DIRSEP); snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
if (taosRenameFile(file, realfile) != 0) { if (taosRenameFile(file, realfile) != 0) {
terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR; terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR;
@ -167,5 +162,3 @@ int32_t mmWriteFile(SDnode *pDnode) {
dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
return 0; return 0;
} }
#endif

View File

@ -15,15 +15,286 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mmInt.h" #include "mmInt.h"
#include "dmInt.h"
#include "mmFile.h"
#include "mmMsg.h" #include "mmMsg.h"
#include "mmWorker.h"
bool mmRequireNode(SMgmtWrapper *pWrapper) { return false; } SMnode *mmAcquire(SMnodeMgmt *pMgmt) {
SMnode *pMnode = NULL;
int32_t refCount = 0;
taosRLockLatch(&pMgmt->latch);
if (pMgmt->deployed && !pMgmt->dropped) {
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
pMnode = pMgmt->pMnode;
} else {
terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
}
taosRUnLockLatch(&pMgmt->latch);
if (pMnode != NULL) {
dTrace("acquire mnode, refCount:%d", refCount);
}
return pMnode;
}
void mmRelease(SMnodeMgmt *pMgmt, SMnode *pMnode) {
if (pMnode == NULL) return;
taosRLockLatch(&pMgmt->latch);
int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosRUnLockLatch(&pMgmt->latch);
dTrace("release mnode, refCount:%d", refCount);
}
int32_t mmOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SMnode *pMnode = mndOpen(pMgmt->path, pOption);
if (pMnode == NULL) {
dError("failed to open mnode since %s", terrstr());
return -1;
}
if (mmStartWorker(pMgmt) != 0) {
dError("failed to start mnode worker since %s", terrstr());
mndClose(pMnode);
mndDestroy(pMgmt->path);
return -1;
}
pMgmt->deployed = 1;
if (mmWriteFile(pMgmt) != 0) {
dError("failed to write mnode file since %s", terrstr());
pMgmt->deployed = 0;
mmStopWorker(pMgmt);
mndClose(pMnode);
mndDestroy(pMgmt->path);
return -1;
}
taosWLockLatch(&pMgmt->latch);
pMgmt->pMnode = pMnode;
pMgmt->deployed = 1;
taosWUnLockLatch(&pMgmt->latch);
dInfo("mnode open successfully");
return 0;
}
int32_t mmAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SMnode *pMnode = mmAcquire(pMgmt);
if (pMnode == NULL) {
dError("failed to alter mnode since %s", terrstr());
return -1;
}
if (mndAlter(pMnode, pOption) != 0) {
dError("failed to alter mnode since %s", terrstr());
mmRelease(pMgmt, pMnode);
return -1;
}
mmRelease(pMgmt, pMnode);
return 0;
}
int32_t mmDrop(SMnodeMgmt *pMgmt) {
SMnode *pMnode = mmAcquire(pMgmt);
if (pMnode == NULL) {
dError("failed to drop mnode since %s", terrstr());
return -1;
}
taosRLockLatch(&pMgmt->latch);
pMgmt->dropped = 1;
taosRUnLockLatch(&pMgmt->latch);
if (mmWriteFile(pMgmt) != 0) {
taosRLockLatch(&pMgmt->latch);
pMgmt->dropped = 0;
taosRUnLockLatch(&pMgmt->latch);
mmRelease(pMgmt, pMnode);
dError("failed to drop mnode since %s", terrstr());
return -1;
}
mmRelease(pMgmt, pMnode);
mmStopWorker(pMgmt);
pMgmt->deployed = 0;
mmWriteFile(pMgmt);
mndClose(pMnode);
pMgmt->pMnode = NULL;
mndDestroy(pMgmt->path);
return 0;
}
static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode;
pOption->pDnode = pDnode;
pOption->sendReqToDnodeFp = dndSendReqToDnode;
pOption->sendReqToMnodeFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dmSendRedirectRsp;
pOption->putReqToMWriteQFp = mmPutMsgToWriteQueue;
pOption->putReqToMReadQFp = mmPutMsgToReadQueue;
pOption->dnodeId = dmGetDnodeId(pDnode);
pOption->clusterId = dmGetClusterId(pDnode);
}
static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode;
mmInitOption(pMgmt, pOption);
pOption->replica = 1;
pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0];
pReplica->id = 1;
pReplica->port = pDnode->cfg.serverPort;
memcpy(pReplica->fqdn, pDnode->cfg.localFqdn, TSDB_FQDN_LEN);
pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica;
memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
}
static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
mmInitOption(pMgmt, pOption);
pOption->selfIndex = pMgmt->selfIndex;
pOption->replica = pMgmt->replica;
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
}
int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
SDnode *pDnode = pMgmt->pDnode;
mmInitOption(pMgmt, pOption);
pOption->dnodeId = dmGetDnodeId(pDnode);
pOption->clusterId = dmGetClusterId(pDnode);
pOption->replica = pCreate->replica;
pOption->selfIndex = -1;
for (int32_t i = 0; i < pCreate->replica; ++i) {
SReplica *pReplica = &pOption->replicas[i];
pReplica->id = pCreate->replicas[i].id;
pReplica->port = pCreate->replicas[i].port;
memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
if (pReplica->id == pOption->dnodeId) {
pOption->selfIndex = i;
}
}
if (pOption->selfIndex == -1) {
dError("failed to build mnode options since %s", terrstr());
return -1;
}
pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica;
memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
return 0;
}
static void mmCleanup(SMgmtWrapper *pWrapper) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("mnode-mgmt start to clean up");
if (pMgmt->pMnode) {
mmStopWorker(pMgmt);
mndClose(pMgmt->pMnode);
pMgmt->pMnode = NULL;
}
free(pMgmt);
pWrapper->pMgmt = NULL;
dInfo("mnode-mgmt is cleaned up");
}
static int32_t mmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
SMnodeMgmt *pMgmt = calloc(1, sizeof(SMnodeMgmt));
int32_t code = -1;
SMnodeOpt option = {0};
dInfo("mnode-mgmt is initialized");
pMgmt->path = pWrapper->path;
pMgmt->pDnode = pWrapper->pDnode;
taosInitRWLatch(&pMgmt->latch);
if (mmReadFile(pMgmt) != 0) {
dError("failed to read file since %s", terrstr());
goto _OVER;
}
if (!pMgmt->deployed) {
dInfo("mnode start to deploy");
mmBuildOptionForDeploy(pMgmt, &option);
code = mmOpen(pMgmt, &option);
} else {
dInfo("mnode start to open");
mmBuildOptionForOpen(pMgmt, &option);
code = mmOpen(pMgmt, &option);
}
_OVER:
if (code == 0) {
dInfo("mnode-mgmt is initialized");
} else {
dError("failed to init mnode-mgmtsince %s", terrstr());
mmCleanup(pWrapper);
}
return code;
}
static bool mmDeployRequired(SDnode *pDnode) {
if (dmGetDnodeId(pDnode) > 0) {
return false;
}
if (dmGetClusterId(pDnode) > 0) {
return false;
}
if (strcmp(pDnode->cfg.localEp, pDnode->cfg.firstEp) != 0) {
return false;
}
return true;
}
static bool mmRequire(SMgmtWrapper *pWrapper) {
SMnodeMgmt mgmt = {0};
mgmt.path = pWrapper->path;
if (mmReadFile(&mgmt) != 0) {
return false;
}
if (mgmt.dropped) {
dInfo("mnode has been dropped and needs to be deleted");
mndDestroy(mgmt.path);
return false;
}
if (mgmt.deployed) {
dInfo("mnode has been deployed");
return true;
}
bool required = mmDeployRequired(pWrapper->pDnode);
if (required) {
dInfo("mnode need to be deployed");
}
return required;
}
void mmGetMgmtFp(SMgmtWrapper *pWrapper) { void mmGetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0}; SMgmtFp mgmtFp = {0};
mgmtFp.openFp = NULL; mgmtFp.openFp = mmInit;
mgmtFp.closeFp = NULL; mgmtFp.closeFp = NULL;
mgmtFp.requiredFp = mmRequireNode; mgmtFp.requiredFp = mmRequire;
mmInitMsgHandles(pWrapper); mmInitMsgHandles(pWrapper);
pWrapper->name = "mnode"; pWrapper->name = "mnode";
@ -31,10 +302,29 @@ void mmGetMgmtFp(SMgmtWrapper *pWrapper) {
} }
int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey) { int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
return 0; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
SMnode *pMnode = mmAcquire(pMgmt);
if (pMnode == NULL) {
terrno = TSDB_CODE_APP_NOT_READY;
dTrace("failed to get user auth since %s", terrstr());
return -1;
}
int32_t code = mndRetriveAuth(pMnode, user, spi, encrypt, secret, ckey);
mmRelease(pMgmt, pMnode);
dTrace("user:%s, retrieve auth spi:%d encrypt:%d", user, *spi, *encrypt);
return code;
} }
int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, int32_t mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo) { SMonGrantInfo *pGrantInfo) {
return 0; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
SMnode *pMnode = mmAcquire(pMgmt);
if (pMnode == NULL) return -1;
int32_t code = mndGetMonitorInfo(pMnode, pClusterInfo, pVgroupInfo, pGrantInfo);
mmRelease(pMgmt, pMnode);
return code;
} }

View File

@ -1,321 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mmInt.h"
#include "dmInt.h"
#include "dndTransport.h"
#if 0
static void mmInitOption(SDnode *pDnode, SMnodeOpt *pOption);
static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption);
static void mmBuildOptionForOpen(SDnode *pDnode, SMnodeOpt *pOption);
static bool mmDeployRequired(SDnode *pDnode);
static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption);
int32_t mmInit(SDnode *pDnode) {
dInfo("mnode mgmt start to init");
int32_t code = -1;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
taosInitRWLatch(&pMgmt->latch);
mmInitMsgFp(pMgmt);
if (mmReadFile(pDnode) != 0) {
goto _OVER;
}
if (pMgmt->dropped) {
dInfo("mnode has been dropped and needs to be deleted");
mndDestroy(pDnode->dir.mnode);
code = 0;
goto _OVER;
}
if (!pMgmt->deployed) {
bool required = mmDeployRequired(pDnode);
if (!required) {
dInfo("mnode does not need to be deployed");
code = 0;
goto _OVER;
}
dInfo("mnode start to deploy");
SMnodeOpt option = {0};
mmBuildOptionForDeploy(pDnode, &option);
code = mmOpen(pDnode, &option);
} else {
dInfo("mnode start to open");
SMnodeOpt option = {0};
mmBuildOptionForOpen(pDnode, &option);
code = mmOpen(pDnode, &option);
}
_OVER:
if (code == 0) {
dInfo("mnode mgmt init success");
} else {
dError("failed to init mnode mgmt since %s", terrstr());
mmCleanup(pDnode);
}
return code;
}
void mmCleanup(SDnode *pDnode) {
dInfo("mnode mgmt start to clean up");
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
if (pMgmt->pMnode) {
mmStopWorker(pDnode);
mndClose(pMgmt->pMnode);
pMgmt->pMnode = NULL;
}
dInfo("mnode mgmt is cleaned up");
}
SMnode *mmAcquire(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = NULL;
int32_t refCount = 0;
taosRLockLatch(&pMgmt->latch);
if (pMgmt->deployed && !pMgmt->dropped) {
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
pMnode = pMgmt->pMnode;
} else {
terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
}
taosRUnLockLatch(&pMgmt->latch);
if (pMnode != NULL) {
dTrace("acquire mnode, refCount:%d", refCount);
}
return pMnode;
}
void mmRelease(SDnode *pDnode, SMnode *pMnode) {
if (pMnode == NULL) return;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
taosRLockLatch(&pMgmt->latch);
int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosRUnLockLatch(&pMgmt->latch);
dTrace("release mnode, refCount:%d", refCount);
}
int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->singleProc = true;
int32_t code = mmOpenImp(pDnode, pOption);
if (code == 0 && !pMgmt->singleProc) {
SProcCfg cfg = {.childQueueSize = 1024 * 1024,
.childConsumeFp = (ProcConsumeFp)mmConsumeChildQueue,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.parentQueueSize = 1024 * 1024,
.parentConsumeFp = (ProcConsumeFp)mmConsumeParentQueue,
.parentdMallocHeadFp = (ProcMallocFp)malloc,
.parentFreeHeadFp = (ProcFreeFp)free,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.testFlag = 0,
.pParent = pDnode,
.name = "mnode"};
pMgmt->pProcess = taosProcInit(&cfg);
if (pMgmt->pProcess == NULL) {
return -1;
}
return taosProcRun(pMgmt->pProcess);
}
return code;
}
int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = mmAcquire(pDnode);
if (pMnode == NULL) {
dError("failed to alter mnode since %s", terrstr());
return -1;
}
if (mndAlter(pMnode, pOption) != 0) {
dError("failed to alter mnode since %s", terrstr());
mmRelease(pDnode, pMnode);
return -1;
}
mmRelease(pDnode, pMnode);
return 0;
}
int32_t mmDrop(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = mmAcquire(pDnode);
if (pMnode == NULL) {
dError("failed to drop mnode since %s", terrstr());
return -1;
}
taosRLockLatch(&pMgmt->latch);
pMgmt->dropped = 1;
taosRUnLockLatch(&pMgmt->latch);
if (mmWriteFile(pDnode) != 0) {
taosRLockLatch(&pMgmt->latch);
pMgmt->dropped = 0;
taosRUnLockLatch(&pMgmt->latch);
mmRelease(pDnode, pMnode);
dError("failed to drop mnode since %s", terrstr());
return -1;
}
mmRelease(pDnode, pMnode);
mmStopWorker(pDnode);
pMgmt->deployed = 0;
mmWriteFile(pDnode);
mndClose(pMnode);
pMgmt->pMnode = NULL;
mndDestroy(pDnode->dir.mnode);
return 0;
}
static bool mmDeployRequired(SDnode *pDnode) {
if (dmGetDnodeId(pDnode) > 0) {
return false;
}
if (dmGetClusterId(pDnode) > 0) {
return false;
}
if (strcmp(pDnode->cfg.localEp, pDnode->cfg.firstEp) != 0) {
return false;
}
return true;
}
static void mmInitOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption->pDnode = pDnode;
pOption->sendReqToDnodeFp = dndSendReqToDnode;
pOption->sendReqToMnodeFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dmSendRedirectRsp;
pOption->putReqToMWriteQFp = mmPutMsgToWriteQueue;
pOption->putReqToMReadQFp = mmPutMsgToReadQueue;
pOption->dnodeId = dmGetDnodeId(pDnode);
pOption->clusterId = dmGetClusterId(pDnode);
}
static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption) {
mmInitOption(pDnode, pOption);
pOption->replica = 1;
pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0];
pReplica->id = 1;
pReplica->port = pDnode->cfg.serverPort;
memcpy(pReplica->fqdn, pDnode->cfg.localFqdn, TSDB_FQDN_LEN);
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica;
memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
}
static void mmBuildOptionForOpen(SDnode *pDnode, SMnodeOpt *pOption) {
mmInitOption(pDnode, pOption);
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pOption->selfIndex = pMgmt->selfIndex;
pOption->replica = pMgmt->replica;
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
}
int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
mmInitOption(pDnode, pOption);
pOption->dnodeId = dmGetDnodeId(pDnode);
pOption->clusterId = dmGetClusterId(pDnode);
pOption->replica = pCreate->replica;
pOption->selfIndex = -1;
for (int32_t i = 0; i < pCreate->replica; ++i) {
SReplica *pReplica = &pOption->replicas[i];
pReplica->id = pCreate->replicas[i].id;
pReplica->port = pCreate->replicas[i].port;
memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
if (pReplica->id == pOption->dnodeId) {
pOption->selfIndex = i;
}
}
if (pOption->selfIndex == -1) {
dError("failed to build mnode options since %s", terrstr());
return -1;
}
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica;
memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
return 0;
}
static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption);
if (pMnode == NULL) {
dError("failed to open mnode since %s", terrstr());
return -1;
}
if (mmStartWorker(pDnode) != 0) {
dError("failed to start mnode worker since %s", terrstr());
mndClose(pMnode);
mndDestroy(pDnode->dir.mnode);
return -1;
}
pMgmt->deployed = 1;
if (mmWriteFile(pDnode) != 0) {
dError("failed to write mnode file since %s", terrstr());
pMgmt->deployed = 0;
mmStopWorker(pDnode);
mndClose(pMnode);
mndDestroy(pDnode->dir.mnode);
return -1;
}
taosWLockLatch(&pMgmt->latch);
pMgmt->pMnode = pMnode;
pMgmt->deployed = 1;
taosWUnLockLatch(&pMgmt->latch);
dInfo("mnode open successfully");
return 0;
}
#endif

View File

@ -15,12 +15,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mmMsg.h" #include "mmMsg.h"
#include "dmInt.h"
#include "mmWorker.h" #include "mmWorker.h"
#if 0
#include "dmInt.h"
int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) { int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) {
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE);
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
SDCreateMnodeReq createReq = {0}; SDCreateMnodeReq createReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
@ -34,25 +35,28 @@ int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) {
} }
SMnodeOpt option = {0}; SMnodeOpt option = {0};
if (mmBuildOptionFromReq(pDnode, &option, &createReq) != 0) { if (mmBuildOptionFromReq(pMgmt, &option, &createReq) != 0) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to create mnode since %s", terrstr()); dError("failed to create mnode since %s", terrstr());
return -1; return -1;
} }
SMnode *pMnode = mmAcquire(pDnode); SMnode *pMnode = mmAcquire(pMgmt);
if (pMnode != NULL) { if (pMnode != NULL) {
mmRelease(pDnode, pMnode); mmRelease(pMgmt, pMnode);
terrno = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; terrno = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED;
dError("failed to create mnode since %s", terrstr()); dError("failed to create mnode since %s", terrstr());
return -1; return -1;
} }
dDebug("start to create mnode"); dDebug("start to create mnode");
return mmOpen(pDnode, &option); return mmOpen(pMgmt, &option);
} }
int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pReq) { int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pReq) {
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE);
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
SDAlterMnodeReq alterReq = {0}; SDAlterMnodeReq alterReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
@ -66,13 +70,13 @@ int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pReq) {
} }
SMnodeOpt option = {0}; SMnodeOpt option = {0};
if (mmBuildOptionFromReq(pDnode, &option, &alterReq) != 0) { if (mmBuildOptionFromReq(pMgmt, &option, &alterReq) != 0) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to alter mnode since %s", terrstr()); dError("failed to alter mnode since %s", terrstr());
return -1; return -1;
} }
SMnode *pMnode = mmAcquire(pDnode); SMnode *pMnode = mmAcquire(pMgmt);
if (pMnode == NULL) { if (pMnode == NULL) {
terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
dError("failed to alter mnode since %s", terrstr()); dError("failed to alter mnode since %s", terrstr());
@ -80,13 +84,16 @@ int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pReq) {
} }
dDebug("start to alter mnode"); dDebug("start to alter mnode");
int32_t code = mmAlter(pDnode, &option); int32_t code = mmAlter(pMgmt, &option);
mmRelease(pDnode, pMnode); mmRelease(pMgmt, pMnode);
return code; return code;
} }
int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) {
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE);
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
SDDropMnodeReq dropReq = {0}; SDDropMnodeReq dropReq = {0};
if (tDeserializeSMCreateDropMnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { if (tDeserializeSMCreateDropMnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
@ -99,7 +106,7 @@ int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1; return -1;
} }
SMnode *pMnode = mmAcquire(pDnode); SMnode *pMnode = mmAcquire(pMgmt);
if (pMnode == NULL) { if (pMnode == NULL) {
terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
dError("failed to drop mnode since %s", terrstr()); dError("failed to drop mnode since %s", terrstr());
@ -107,45 +114,12 @@ int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) {
} }
dDebug("start to drop mnode"); dDebug("start to drop mnode");
int32_t code = mmDrop(pDnode); int32_t code = mmDrop(pMgmt);
mmRelease(pDnode, pMnode); mmRelease(pMgmt, pMnode);
return code; return code;
} }
int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo) {
SMnode *pMnode = mmAcquire(pDnode);
if (pMnode == NULL) return -1;
int32_t code = mndGetMonitorInfo(pMnode, pClusterInfo, pVgroupInfo, pGrantInfo);
mmRelease(pDnode, pMnode);
return code;
}
int32_t mmGetUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = mmAcquire(pDnode);
if (pMnode == NULL) {
terrno = TSDB_CODE_APP_NOT_READY;
dTrace("failed to get user auth since %s", terrstr());
return -1;
}
int32_t code = mndRetriveAuth(pMnode, user, spi, encrypt, secret, ckey);
mmRelease(pDnode, pMnode);
dTrace("user:%s, retrieve auth spi:%d encrypt:%d", user, *spi, *encrypt);
return code;
}
#endif
int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
void mmInitMsgHandles(SMgmtWrapper *pWrapper) { void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by DNODE // Requests handled by DNODE
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg);

View File

@ -20,16 +20,63 @@
#include "dndTransport.h" #include "dndTransport.h"
#include "dndWorker.h" #include "dndWorker.h"
#if 0 static void mmSendRpcRsp(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) {
static int32_t mmProcessWriteMsg(SDnode *pDnode, SMndMsg *pMsg); if (pRpc->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRpc->code == TSDB_CODE_APP_NOT_READY) {
static int32_t mmProcessSyncMsg(SDnode *pDnode, SMndMsg *pMsg); dmSendRedirectRsp(pMgmt->pDnode, pRpc);
static int32_t mmProcessReadMsg(SDnode *pDnode, SMndMsg *pMsg); } else {
static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMsg *pMsg); rpcSendResponse(pRpc);
static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc); }
static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg); }
int32_t mmStartWorker(SDnode *pDnode) { void mmPutRpcRspToWorker(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; int32_t code = -1;
if (pMgmt->singleProc) {
mmSendRpcRsp(pMgmt, pRpc);
} else {
do {
code = taosProcPutToParentQueue(pMgmt->pProcess, pRpc, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen);
if (code != 0) {
taosMsleep(10);
}
} while (code != 0);
}
}
static void mmConsumeMsgQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
dTrace("msg:%p, will be processed", pMsg);
SMnode *pMnode = mmAcquire(pMgmt);
SRpcMsg *pRpc = &pMsg->rpcMsg;
bool isReq = (pRpc->msgType & 1U);
int32_t code = -1;
if (pMnode != NULL) {
pMsg->pNode = pMnode;
code = mndProcessMsg((SMndMsg*)pMsg);
mmRelease(pMgmt, pMnode);
}
if (isReq) {
if (pMsg->rpcMsg.handle == NULL) return;
if (code == 0) {
SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp};
mmPutRpcRspToWorker(pMgmt, &rsp);
} else {
if (terrno != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp, .code = terrno};
mmPutRpcRspToWorker(pMgmt, &rsp);
}
}
}
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pRpc->pCont);
taosFreeQitem(pMsg);
}
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SDnode *pDnode = pMgmt->pDnode;
if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmConsumeMsgQueue) != 0) { if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmConsumeMsgQueue) != 0) {
dError("failed to start mnode read worker since %s", terrstr()); dError("failed to start mnode read worker since %s", terrstr());
return -1; return -1;
@ -48,9 +95,7 @@ int32_t mmStartWorker(SDnode *pDnode) {
return 0; return 0;
} }
void mmStopWorker(SDnode *pDnode) { void mmStopWorker(SMnodeMgmt *pMgmt) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
pMgmt->deployed = 0; pMgmt->deployed = 0;
taosWUnLockLatch(&pMgmt->latch); taosWUnLockLatch(&pMgmt->latch);
@ -64,122 +109,41 @@ void mmStopWorker(SDnode *pDnode) {
dndCleanupWorker(&pMgmt->syncWorker); dndCleanupWorker(&pMgmt->syncWorker);
} }
void mmInitMsgFp(SMnodeMgmt *pMgmt) { static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) {
SMnode *pMnode = mmAcquire(pMgmt);
}
static void mmSendRpcRsp(SDnode *pDnode, SRpcMsg *pRpc) {
if (pRpc->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRpc->code == TSDB_CODE_APP_NOT_READY) {
dmSendRedirectRsp(pDnode, pRpc);
} else {
rpcSendResponse(pRpc);
}
}
static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) {
SRpcConnInfo connInfo = {0};
if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
dError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle);
return -1;
}
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
pMsg->rpcMsg = *pRpc;
pMsg->createdTime = taosGetTimestampSec();
return 0;
}
void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
int32_t code = -1;
SMndMsg *pMsg = NULL;
MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)];
if (msgFp == NULL) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
goto _OVER;
}
pMsg = taosAllocateQitem(sizeof(SMndMsg));
if (pMsg == NULL) {
goto _OVER;
}
if (mmBuildMsg(pMsg, pRpc) != 0) {
goto _OVER;
}
dTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpc->ahandle, pRpc->handle, pMsg->user);
if (pMgmt->singleProc) {
code = (*msgFp)(pDnode, pMsg);
} else {
code = taosProcPutToChildQueue(pMgmt->pProcess, pMsg, sizeof(SMndMsg), pRpc->pCont, pRpc->contLen);
}
_OVER:
if (code == 0) {
if (!pMgmt->singleProc) {
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
}
} else {
bool isReq = (pRpc->msgType & 1U);
if (isReq) {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
mmSendRpcRsp(pDnode, &rsp);
}
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
}
}
int32_t mmProcessWriteMsg(SDnode *pDnode, SMndMsg *pMsg) {
return mmPutMndMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg);
}
int32_t mmProcessSyncMsg(SDnode *pDnode, SMndMsg *pMsg) {
return mmPutMndMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg);
}
int32_t mmProcessReadMsg(SDnode *pDnode, SMndMsg *pMsg) {
return mmPutMndMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg);
}
int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpc) {
return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pRpc);
}
int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpc) {
return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pRpc);
}
static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMsg *pMsg) {
SMnode *pMnode = mmAcquire(pDnode);
if (pMnode == NULL) return -1; if (pMnode == NULL) return -1;
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0); int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0);
mmRelease(pDnode, pMnode); mmRelease(pMgmt, pMnode);
return code; return code;
} }
static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc) { int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMndMsg *pMsg = taosAllocateQitem(sizeof(SMndMsg)); SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
}
int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
}
int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
}
static int32_t mmPutRpcMsgToWorker(SMgmtWrapper *pWrapper, SDnodeWorker *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) { if (pMsg == NULL) {
return -1; return -1;
} }
dTrace("msg:%p, is created", pMsg); dTrace("msg:%p, is created", pMsg);
pMsg->rpcMsg = *pRpc; pMsg->rpcMsg = *pRpc;
pMsg->createdTime = taosGetTimestampSec();
int32_t code = mmPutMndMsgToWorker(pDnode, pWorker, pMsg); int32_t code = mmPutMsgToWorker(pWrapper->pMgmt, pWorker, pMsg);
if (code != 0) { if (code != 0) {
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
@ -189,88 +153,14 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs
return code; return code;
} }
void mmPutRpcRspToWorker(SDnode *pDnode, SRpcMsg *pRpc) { int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpc) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE);
int32_t code = -1; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mmPutRpcMsgToWorker(pWrapper, &pMgmt->writeWorker, pRpc);
if (pMgmt->singleProc) {
mmSendRpcRsp(pDnode, pRpc);
} else {
do {
code = taosProcPutToParentQueue(pMgmt->pProcess, pRpc, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen);
if (code != 0) {
taosMsleep(10);
}
} while (code != 0);
}
} }
void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpc) {
dTrace("msg:%p, get from child queue", pMsg); SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE);
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mmPutRpcMsgToWorker(pWrapper, &pMgmt->readWorker, pRpc);
SRpcMsg *pRpc = &pMsg->rpcMsg;
pRpc->pCont = pCont;
MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)];
int32_t code = (*msgFp)(pDnode, pMsg);
if (code != 0) {
bool isReq = (pRpc->msgType & 1U);
if (isReq) {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
mmPutRpcRspToWorker(pDnode, &rsp);
}
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pCont);
}
} }
void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
dTrace("msg:%p, get from parent queue", pMsg);
pMsg->pCont = pCont;
mmSendRpcRsp(pDnode, pMsg);
free(pMsg);
}
static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) {
dTrace("msg:%p, get from msg queue", pMsg);
SMnode *pMnode = mmAcquire(pDnode);
SRpcMsg *pRpc = &pMsg->rpcMsg;
bool isReq = (pRpc->msgType & 1U);
int32_t code = -1;
if (pMnode != NULL) {
pMsg->pMnode = pMnode;
code = mndProcessMsg(pMsg);
mmRelease(pDnode, pMnode);
}
if (isReq) {
if (pMsg->rpcMsg.handle == NULL) return;
if (code == 0) {
SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont};
mmPutRpcRspToWorker(pDnode, &rsp);
} else {
if (terrno != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont, .code = terrno};
mmPutRpcRspToWorker(pDnode, &rsp);
}
}
}
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pRpc->pCont);
taosFreeQitem(pMsg);
}
#endif
int32_t mmProcessWriteMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;}
int32_t mmProcessSyncMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;}
int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return -1;
}

View File

@ -348,13 +348,13 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize); pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize);
pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize); pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize);
if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) {
taosProcQueueCleanup(pProc->pChildQueue); taosProcQueueCleanup(pProc->pChildQueue);
free(pProc); free(pProc);
return NULL; return NULL;
} }
pProc->pChildQueue->name = pCfg->name; pProc->pChildQueue->name = pCfg->name;
pProc->pChildQueue->pParent = pCfg->pParent; pProc->pChildQueue->pParent = pCfg->pParent;
pProc->pChildQueue->mallocHeadFp = pCfg->childMallocHeadFp; pProc->pChildQueue->mallocHeadFp = pCfg->childMallocHeadFp;
pProc->pChildQueue->freeHeadFp = pCfg->childFreeHeadFp; pProc->pChildQueue->freeHeadFp = pCfg->childFreeHeadFp;
@ -436,8 +436,7 @@ int32_t taosProcRun(SProcObj *pProc) {
void taosProcStop(SProcObj *pProc) { void taosProcStop(SProcObj *pProc) {
pProc->stopFlag = true; pProc->stopFlag = true;
// todo // todo join
// join
} }
bool taosProcIsChild(SProcObj *pProc) { return pProc->isChild; } bool taosProcIsChild(SProcObj *pProc) { return pProc->isChild; }
@ -445,6 +444,7 @@ bool taosProcIsChild(SProcObj *pProc) { return pProc->isChild; }
void taosProcCleanup(SProcObj *pProc) { void taosProcCleanup(SProcObj *pProc) {
if (pProc != NULL) { if (pProc != NULL) {
uDebug("proc:%s, clean up", pProc->name); uDebug("proc:%s, clean up", pProc->name);
taosProcStop(pProc);
taosProcQueueCleanup(pProc->pChildQueue); taosProcQueueCleanup(pProc->pChildQueue);
taosProcQueueCleanup(pProc->pParentQueue); taosProcQueueCleanup(pProc->pParentQueue);
free(pProc); free(pProc);