shm
This commit is contained in:
parent
43083fa3e2
commit
ec1d86eecc
|
@ -41,7 +41,7 @@ void dndCleanup();
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t numOfSupportVnodes;
|
int32_t numOfSupportVnodes;
|
||||||
uint16_t serverPort;
|
uint16_t serverPort;
|
||||||
char dataDir[TSDB_FILENAME_LEN];
|
char dataDir[PATH_MAX];
|
||||||
char localEp[TSDB_EP_LEN];
|
char localEp[TSDB_EP_LEN];
|
||||||
char localFqdn[TSDB_FQDN_LEN];
|
char localFqdn[TSDB_FQDN_LEN];
|
||||||
char firstEp[TSDB_EP_LEN];
|
char firstEp[TSDB_EP_LEN];
|
||||||
|
|
|
@ -104,6 +104,9 @@ typedef struct SMgmtFp {
|
||||||
typedef struct SMgmtWrapper {
|
typedef struct SMgmtWrapper {
|
||||||
const char *name;
|
const char *name;
|
||||||
char *path;
|
char *path;
|
||||||
|
int32_t refCount;
|
||||||
|
bool deployed;
|
||||||
|
bool dropped;
|
||||||
bool required;
|
bool required;
|
||||||
EProcType procType;
|
EProcType procType;
|
||||||
SProcObj *pProc;
|
SProcObj *pProc;
|
||||||
|
|
|
@ -37,12 +37,19 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup);
|
||||||
TdFilePtr dndCheckRunning(char *dataDir);
|
TdFilePtr dndCheckRunning(char *dataDir);
|
||||||
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
|
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
|
||||||
|
|
||||||
|
// dndMsg.c
|
||||||
|
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
|
||||||
// dndNode.c
|
// dndNode.c
|
||||||
|
bool dndRequireNode(SMgmtWrapper *pWrapper);
|
||||||
|
int32_t dndOpenNode(SMgmtWrapper *pWrapper);
|
||||||
|
void dndCloseNode(SMgmtWrapper *pWrapper);
|
||||||
|
int32_t dndRun(SDnode *pDnode);
|
||||||
|
|
||||||
|
// dndObj.c
|
||||||
SDnode *dndCreate(const SDnodeOpt *pOption);
|
SDnode *dndCreate(const SDnodeOpt *pOption);
|
||||||
void dndClose(SDnode *pDnode);
|
void dndClose(SDnode *pDnode);
|
||||||
int32_t dndRun(SDnode *pDnode);
|
|
||||||
void dndHandleEvent(SDnode *pDnode, EDndEvent event);
|
void dndHandleEvent(SDnode *pDnode, EDndEvent event);
|
||||||
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
|
|
||||||
|
|
||||||
// dndTransport.c
|
// dndTransport.c
|
||||||
int32_t dndInitServer(SDnode *pDnode);
|
int32_t dndInitServer(SDnode *pDnode);
|
||||||
|
|
|
@ -16,6 +16,77 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "dndInt.h"
|
#include "dndInt.h"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
|
SRpcConnInfo connInfo = {0};
|
||||||
|
if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
|
||||||
|
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||||
|
dError("failed to build msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
|
||||||
|
pMsg->rpcMsg = *pRpc;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
|
if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
|
||||||
|
dmUpdateMnodeEpSet(dndGetWrapper(pWrapper->pDnode, DNODE)->pMgmt, pEpSet);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = -1;
|
||||||
|
SNodeMsg *pMsg = NULL;
|
||||||
|
|
||||||
|
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
|
||||||
|
if (msgFp == NULL) {
|
||||||
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pMsg = taosAllocateQitem(sizeof(SNodeMsg));
|
||||||
|
if (pMsg == NULL) {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dndBuildMsg(pMsg, pRpc, pEpSet) != 0) {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user);
|
||||||
|
|
||||||
|
if (pWrapper->procType == PROC_SINGLE) {
|
||||||
|
code = (*msgFp)(pWrapper->pMgmt, pMsg);
|
||||||
|
} else if (pWrapper->procType == PROC_PARENT) {
|
||||||
|
code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_MEMORY_CORRUPTED;
|
||||||
|
dError("msg:%p, won't be processed for it is child process", pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
|
||||||
|
if (code == 0) {
|
||||||
|
if (pWrapper->procType == PROC_PARENT) {
|
||||||
|
dTrace("msg:%p, is freed", pMsg);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
|
rpcFreeCont(pRpc->pCont);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
dError("msg:%p, failed to process since %s", pMsg, terrstr());
|
||||||
|
bool isReq = (pRpc->msgType & 1U);
|
||||||
|
if (isReq) {
|
||||||
|
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
|
||||||
|
dndSendRsp(pWrapper, &rsp);
|
||||||
|
}
|
||||||
|
dTrace("msg:%p, is freed", pMsg);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
|
rpcFreeCont(pRpc->pCont);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static SMgmtWrapper *dndGetWrapperFromMsg(SDnode *pDnode, SNodeMsg *pMsg) {
|
static SMgmtWrapper *dndGetWrapperFromMsg(SDnode *pDnode, SNodeMsg *pMsg) {
|
||||||
SMgmtWrapper *pWrapper = NULL;
|
SMgmtWrapper *pWrapper = NULL;
|
||||||
switch (pMsg->rpcMsg.msgType) {
|
switch (pMsg->rpcMsg.msgType) {
|
||||||
|
|
|
@ -25,7 +25,7 @@ static void dndResetLog(SMgmtWrapper *pMgmt) {
|
||||||
taosInitLog(logname, 1);
|
taosInitLog(logname, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool dndRequireNode(SMgmtWrapper *pMgmt) {
|
bool dndRequireNode(SMgmtWrapper *pMgmt) {
|
||||||
bool required = (*pMgmt->fp.requiredFp)(pMgmt);
|
bool required = (*pMgmt->fp.requiredFp)(pMgmt);
|
||||||
if (!required) {
|
if (!required) {
|
||||||
dDebug("node:%s, no need to start", pMgmt->name);
|
dDebug("node:%s, no need to start", pMgmt->name);
|
||||||
|
@ -35,9 +35,9 @@ static bool dndRequireNode(SMgmtWrapper *pMgmt) {
|
||||||
return required;
|
return required;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dndOpenNode(SMgmtWrapper *pWrapper) { return (*pWrapper->fp.openFp)(pWrapper); }
|
int32_t dndOpenNode(SMgmtWrapper *pWrapper) { return (*pWrapper->fp.openFp)(pWrapper); }
|
||||||
|
|
||||||
static void dndCloseNode(SMgmtWrapper *pWrapper) {
|
void dndCloseNode(SMgmtWrapper *pWrapper) {
|
||||||
if (pWrapper->required) {
|
if (pWrapper->required) {
|
||||||
(*pWrapper->fp.closeFp)(pWrapper);
|
(*pWrapper->fp.closeFp)(pWrapper);
|
||||||
pWrapper->required = false;
|
pWrapper->required = false;
|
||||||
|
@ -48,137 +48,6 @@ static void dndCloseNode(SMgmtWrapper *pWrapper) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) {
|
|
||||||
pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes;
|
|
||||||
pDnode->serverPort = pOption->serverPort;
|
|
||||||
pDnode->dataDir = strdup(pOption->dataDir);
|
|
||||||
pDnode->localEp = strdup(pOption->localEp);
|
|
||||||
pDnode->localFqdn = strdup(pOption->localFqdn);
|
|
||||||
pDnode->firstEp = strdup(pOption->firstEp);
|
|
||||||
pDnode->secondEp = strdup(pOption->secondEp);
|
|
||||||
pDnode->pDisks = pOption->pDisks;
|
|
||||||
pDnode->numOfDisks = pOption->numOfDisks;
|
|
||||||
pDnode->rebootTime = taosGetTimestampMs();
|
|
||||||
|
|
||||||
if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL ||
|
|
||||||
pDnode->secondEp == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dndClearMemory(SDnode *pDnode) {
|
|
||||||
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
|
||||||
SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
|
|
||||||
tfree(pMgmt->path);
|
|
||||||
}
|
|
||||||
if (pDnode->pLockFile != NULL) {
|
|
||||||
taosUnLockFile(pDnode->pLockFile);
|
|
||||||
taosCloseFile(&pDnode->pLockFile);
|
|
||||||
pDnode->pLockFile = NULL;
|
|
||||||
}
|
|
||||||
tfree(pDnode->localEp);
|
|
||||||
tfree(pDnode->localFqdn);
|
|
||||||
tfree(pDnode->firstEp);
|
|
||||||
tfree(pDnode->secondEp);
|
|
||||||
tfree(pDnode->dataDir);
|
|
||||||
free(pDnode);
|
|
||||||
dDebug("dnode object memory is cleared, data:%p", pDnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
SDnode *dndCreate(const SDnodeOpt *pOption) {
|
|
||||||
dInfo("start to create dnode object");
|
|
||||||
int32_t code = -1;
|
|
||||||
char path[PATH_MAX + 100];
|
|
||||||
SDnode *pDnode = NULL;
|
|
||||||
|
|
||||||
pDnode = calloc(1, sizeof(SDnode));
|
|
||||||
if (pDnode == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dndInitMemory(pDnode, pOption) != 0) {
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
dndSetStatus(pDnode, DND_STAT_INIT);
|
|
||||||
pDnode->pLockFile = dndCheckRunning(pDnode->dataDir);
|
|
||||||
if (pDnode->pLockFile == NULL) {
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dndInitServer(pDnode) != 0) {
|
|
||||||
dError("failed to init trans server since %s", terrstr());
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dndInitClient(pDnode) != 0) {
|
|
||||||
dError("failed to init trans client since %s", terrstr());
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
dmGetMgmtFp(&pDnode->wrappers[DNODE]);
|
|
||||||
mmGetMgmtFp(&pDnode->wrappers[MNODE]);
|
|
||||||
vmGetMgmtFp(&pDnode->wrappers[VNODES]);
|
|
||||||
qmGetMgmtFp(&pDnode->wrappers[QNODE]);
|
|
||||||
smGetMgmtFp(&pDnode->wrappers[SNODE]);
|
|
||||||
bmGetMgmtFp(&pDnode->wrappers[BNODE]);
|
|
||||||
|
|
||||||
if (dndInitMsgHandle(pDnode) != 0) {
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
|
||||||
snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name);
|
|
||||||
pWrapper->path = strdup(path);
|
|
||||||
pWrapper->pDnode = pDnode;
|
|
||||||
if (pWrapper->path == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
pWrapper->procType = PROC_SINGLE;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = 0;
|
|
||||||
|
|
||||||
_OVER:
|
|
||||||
if (code != 0 && pDnode) {
|
|
||||||
dndClearMemory(pDnode);
|
|
||||||
dError("failed to create dnode object since %s", terrstr());
|
|
||||||
} else {
|
|
||||||
dInfo("dnode object is created, data:%p", pDnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pDnode;
|
|
||||||
}
|
|
||||||
|
|
||||||
void dndClose(SDnode *pDnode) {
|
|
||||||
if (pDnode == NULL) return;
|
|
||||||
|
|
||||||
if (dndGetStatus(pDnode) == DND_STAT_STOPPED) {
|
|
||||||
dError("dnode is shutting down, data:%p", pDnode);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
dInfo("start to close dnode, data:%p", pDnode);
|
|
||||||
dndSetStatus(pDnode, DND_STAT_STOPPED);
|
|
||||||
|
|
||||||
dndCleanupServer(pDnode);
|
|
||||||
dndCleanupClient(pDnode);
|
|
||||||
|
|
||||||
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
|
||||||
dndCloseNode(pWrapper);
|
|
||||||
}
|
|
||||||
|
|
||||||
dndClearMemory(pDnode);
|
|
||||||
dInfo("dnode object is closed, data:%p", pDnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
|
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
|
||||||
dInfo("dnode run in single process mode");
|
dInfo("dnode run in single process mode");
|
||||||
|
|
||||||
|
@ -350,77 +219,3 @@ int32_t dndRun(SDnode *pDnode) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
|
|
||||||
dInfo("dnode object receive event %d, data:%p", event, pDnode);
|
|
||||||
pDnode->event = event;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|
||||||
SRpcConnInfo connInfo = {0};
|
|
||||||
if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
|
|
||||||
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
|
||||||
dError("failed to build msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
|
|
||||||
pMsg->rpcMsg = *pRpc;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|
||||||
if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
|
|
||||||
dmUpdateMnodeEpSet(dndGetWrapper(pWrapper->pDnode, DNODE)->pMgmt, pEpSet);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = -1;
|
|
||||||
SNodeMsg *pMsg = NULL;
|
|
||||||
|
|
||||||
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
|
|
||||||
if (msgFp == NULL) {
|
|
||||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
pMsg = taosAllocateQitem(sizeof(SNodeMsg));
|
|
||||||
if (pMsg == NULL) {
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dndBuildMsg(pMsg, pRpc, pEpSet) != 0) {
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user);
|
|
||||||
|
|
||||||
if (pWrapper->procType == PROC_SINGLE) {
|
|
||||||
code = (*msgFp)(pWrapper->pMgmt, pMsg);
|
|
||||||
} else if (pWrapper->procType == PROC_PARENT) {
|
|
||||||
code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen);
|
|
||||||
} else {
|
|
||||||
terrno = TSDB_CODE_MEMORY_CORRUPTED;
|
|
||||||
dError("msg:%p, won't be processed for it is child process", pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
_OVER:
|
|
||||||
|
|
||||||
if (code == 0) {
|
|
||||||
if (pWrapper->procType == PROC_PARENT) {
|
|
||||||
dTrace("msg:%p, is freed", pMsg);
|
|
||||||
taosFreeQitem(pMsg);
|
|
||||||
rpcFreeCont(pRpc->pCont);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
dError("msg:%p, failed to process since %s", pMsg, terrstr());
|
|
||||||
bool isReq = (pRpc->msgType & 1U);
|
|
||||||
if (isReq) {
|
|
||||||
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
|
|
||||||
dndSendRsp(pWrapper, &rsp);
|
|
||||||
}
|
|
||||||
dTrace("msg:%p, is freed", pMsg);
|
|
||||||
taosFreeQitem(pMsg);
|
|
||||||
rpcFreeCont(pRpc->pCont);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,138 @@
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "dndInt.h"
|
||||||
|
|
||||||
|
static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) {
|
||||||
|
pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes;
|
||||||
|
pDnode->serverPort = pOption->serverPort;
|
||||||
|
pDnode->dataDir = strdup(pOption->dataDir);
|
||||||
|
pDnode->localEp = strdup(pOption->localEp);
|
||||||
|
pDnode->localFqdn = strdup(pOption->localFqdn);
|
||||||
|
pDnode->firstEp = strdup(pOption->firstEp);
|
||||||
|
pDnode->secondEp = strdup(pOption->secondEp);
|
||||||
|
pDnode->pDisks = pOption->pDisks;
|
||||||
|
pDnode->numOfDisks = pOption->numOfDisks;
|
||||||
|
pDnode->rebootTime = taosGetTimestampMs();
|
||||||
|
|
||||||
|
if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL ||
|
||||||
|
pDnode->secondEp == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dndClearMemory(SDnode *pDnode) {
|
||||||
|
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
||||||
|
SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
|
||||||
|
tfree(pMgmt->path);
|
||||||
|
}
|
||||||
|
if (pDnode->pLockFile != NULL) {
|
||||||
|
taosUnLockFile(pDnode->pLockFile);
|
||||||
|
taosCloseFile(&pDnode->pLockFile);
|
||||||
|
pDnode->pLockFile = NULL;
|
||||||
|
}
|
||||||
|
tfree(pDnode->localEp);
|
||||||
|
tfree(pDnode->localFqdn);
|
||||||
|
tfree(pDnode->firstEp);
|
||||||
|
tfree(pDnode->secondEp);
|
||||||
|
tfree(pDnode->dataDir);
|
||||||
|
free(pDnode);
|
||||||
|
dDebug("dnode object memory is cleared, data:%p", pDnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
SDnode *dndCreate(const SDnodeOpt *pOption) {
|
||||||
|
dInfo("start to create dnode object");
|
||||||
|
int32_t code = -1;
|
||||||
|
char path[PATH_MAX];
|
||||||
|
SDnode *pDnode = NULL;
|
||||||
|
|
||||||
|
pDnode = calloc(1, sizeof(SDnode));
|
||||||
|
if (pDnode == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dndInitMemory(pDnode, pOption) != 0) {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
dndSetStatus(pDnode, DND_STAT_INIT);
|
||||||
|
pDnode->pLockFile = dndCheckRunning(pDnode->dataDir);
|
||||||
|
if (pDnode->pLockFile == NULL) {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dndInitServer(pDnode) != 0) {
|
||||||
|
dError("failed to init trans server since %s", terrstr());
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dndInitClient(pDnode) != 0) {
|
||||||
|
dError("failed to init trans client since %s", terrstr());
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
dmGetMgmtFp(&pDnode->wrappers[DNODE]);
|
||||||
|
mmGetMgmtFp(&pDnode->wrappers[MNODE]);
|
||||||
|
vmGetMgmtFp(&pDnode->wrappers[VNODES]);
|
||||||
|
qmGetMgmtFp(&pDnode->wrappers[QNODE]);
|
||||||
|
smGetMgmtFp(&pDnode->wrappers[SNODE]);
|
||||||
|
bmGetMgmtFp(&pDnode->wrappers[BNODE]);
|
||||||
|
|
||||||
|
if (dndInitMsgHandle(pDnode) != 0) {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
||||||
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||||
|
snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name);
|
||||||
|
pWrapper->path = strdup(path);
|
||||||
|
pWrapper->pDnode = pDnode;
|
||||||
|
if (pWrapper->path == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pWrapper->procType = PROC_SINGLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
if (code != 0 && pDnode) {
|
||||||
|
dndClearMemory(pDnode);
|
||||||
|
dError("failed to create dnode object since %s", terrstr());
|
||||||
|
} else {
|
||||||
|
dInfo("dnode object is created, data:%p", pDnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pDnode;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dndClose(SDnode *pDnode) {
|
||||||
|
if (pDnode == NULL) return;
|
||||||
|
|
||||||
|
if (dndGetStatus(pDnode) == DND_STAT_STOPPED) {
|
||||||
|
dError("dnode is shutting down, data:%p", pDnode);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
dInfo("start to close dnode, data:%p", pDnode);
|
||||||
|
dndSetStatus(pDnode, DND_STAT_STOPPED);
|
||||||
|
|
||||||
|
dndCleanupServer(pDnode);
|
||||||
|
dndCleanupClient(pDnode);
|
||||||
|
|
||||||
|
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
||||||
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||||
|
dndCloseNode(pWrapper);
|
||||||
|
}
|
||||||
|
|
||||||
|
dndClearMemory(pDnode);
|
||||||
|
dInfo("dnode object is closed, data:%p", pDnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
|
||||||
|
dInfo("dnode object receive event %d, data:%p", event, pDnode);
|
||||||
|
pDnode->event = event;
|
||||||
|
}
|
Loading…
Reference in New Issue