refact(cluster): node mgmt

This commit is contained in:
Shengliang Guan 2022-04-13 15:52:14 +08:00
parent 8522c27c60
commit 3663c6441d
15 changed files with 72 additions and 83 deletions

View File

@ -68,6 +68,7 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption);
* @param pMnode The mnode object.
*/
int32_t mndStart(SMnode *pMnode);
void mndStop(SMnode *pMnode);
/**
* @brief Get mnode monitor info.

View File

@ -29,6 +29,7 @@ void dmCloseNode(SMgmtWrapper *pWrapper);
int32_t dmInitTrans(SDnode *pDnode);
void dmCleanupTrans(SDnode *pDnode);
SProcCfg dmGenProcCfg(SMgmtWrapper *pWrapper);
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper);
int32_t dmInitMsgHandle(SDnode *pDnode);
void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
void dmSendToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp);

View File

@ -124,7 +124,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
return dmOpenNodeImp(pWrapper);
} else if (pDnode->ptype == DND_PROC_PARENT) {
if (dmInitNodeProc(pWrapper) != 0) return -1;
if (dmWriteShmFile(pDnode) != 0) return -1;
if (dmWriteShmFile(pWrapper) != 0) return -1;
if (dmRunNodeProc(pWrapper) != 0) return -1;
}
return 0;
@ -226,11 +226,11 @@ static int32_t dmRunInParentProcess(SDnode *pDnode) {
pWrapper->required = dmRequireNode(pWrapper);
if (!pWrapper->required) continue;
if (dmInitNodeProc(pWrapper) != 0) return -1;
}
if (dmWriteShmFile(pDnode) != 0) {
dError("failed to write runtime file since %s", terrstr());
return -1;
if (dmWriteShmFile(pWrapper) != 0) {
dError("failed to write runtime file since %s", terrstr());
return -1;
}
}
for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {

View File

@ -72,7 +72,7 @@ static void dmClearVars(SDnode *pDnode) {
}
SDnode *dmCreate(const SDnodeOpt *pOption) {
dDebug("start to create dnode object");
dDebug("start to create dnode");
int32_t code = -1;
char path[PATH_MAX] = {0};
SDnode *pDnode = NULL;
@ -96,20 +96,25 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
smSetMgmtFp(&pDnode->wrappers[SNODE]);
bmSetMgmtFp(&pDnode->wrappers[BNODE]);
for (EDndNodeType n = 0; n < NODE_END; ++n) {
for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
snprintf(path, sizeof(path), "%s%s%s", pDnode->data.dataDir, TD_DIRSEP, pWrapper->name);
pWrapper->path = strdup(path);
pWrapper->procShm.id = -1;
pWrapper->pDnode = pDnode;
pWrapper->ntype = n;
pWrapper->procType = DND_PROC_SINGLE;
taosInitRWLatch(&pWrapper->latch);
if (pWrapper->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
pWrapper->procType = DND_PROC_SINGLE;
taosInitRWLatch(&pWrapper->latch);
if (n != DNODE && dmReadShmFile(pWrapper) != 0) {
dError("node:%s, failed to read shm file since %s", pWrapper->name, terrstr());
goto _OVER;
}
}
if (dmInitMsgHandle(pDnode) != 0) {
@ -117,13 +122,8 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
goto _OVER;
}
if (dmReadShmFile(pDnode) != 0) {
dError("failed to read shm file since %s", terrstr());
goto _OVER;
}
SMsgCb msgCb = dmGetMsgcb(&pDnode->wrappers[0]);
tmsgSetDefaultMsgCb(&msgCb);
pDnode->data.msgCb = dmGetMsgcb(&pDnode->wrappers[DNODE]);
tmsgSetDefaultMsgCb(&pDnode->data.msgCb);
dInfo("dnode is created, data:%p", pDnode);
code = 0;

View File

@ -503,15 +503,6 @@ static void dmCleanupServer(SDnode *pDnode) {
int32_t dmInitTrans(SDnode *pDnode) {
if (dmInitServer(pDnode) != 0) return -1;
if (dmInitClient(pDnode) != 0) return -1;
SMsgCb msgCb = {
.sendReqFp = dmSendReq,
.sendRspFp = dmSendRsp,
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
.releaseHandleFp = dmReleaseHandle,
};
pDnode->data.msgCb = msgCb;
return 0;
}
@ -519,3 +510,14 @@ void dmCleanupTrans(SDnode *pDnode) {
dmCleanupServer(pDnode);
dmCleanupClient(pDnode);
}
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
SMsgCb msgCb = {
.sendReqFp = dmSendReq,
.sendRspFp = dmSendRsp,
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
.releaseHandleFp = dmReleaseHandle,
.pWrapper = pWrapper,
};
return msgCb;
}

View File

@ -37,14 +37,13 @@ void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgF
void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
void dmGetMonitorSysInfo(SMonSysInfo *pInfo);
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper);
// dmFile.c
int32_t dmReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
int32_t dmWriteFile(SMgmtWrapper *pWrapper, bool deployed);
TdFilePtr dmCheckRunning(const char *dataDir);
int32_t dmReadShmFile(SDnode *pDnode);
int32_t dmWriteShmFile(SDnode *pDnode);
int32_t dmReadShmFile(SMgmtWrapper *pWrapper);
int32_t dmWriteShmFile(SMgmtWrapper *pWrapper);
#ifdef __cplusplus
}

View File

@ -140,18 +140,17 @@ TdFilePtr dmCheckRunning(const char *dataDir) {
return pFile;
}
int32_t dmReadShmFile(SDnode *pDnode) {
int32_t dmReadShmFile(SMgmtWrapper *pWrapper) {
int32_t code = -1;
char itemName[24] = {0};
char content[MAXLEN + 1] = {0};
char file[PATH_MAX] = {0};
cJSON *root = NULL;
TdFilePtr pFile = NULL;
snprintf(file, sizeof(file), "%s%s.shmfile", pDnode->data.dataDir, TD_DIRSEP);
snprintf(file, sizeof(file), "%s%sshmfile", pWrapper->path, TD_DIRSEP);
pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) {
dDebug("file %s not exist", file);
dDebug("node:%s, file %s not exist", pWrapper->name, file);
code = 0;
goto _OVER;
}
@ -160,35 +159,27 @@ int32_t dmReadShmFile(SDnode *pDnode) {
root = cJSON_Parse(content);
if (root == NULL) {
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
dError("failed to read %s since invalid json format", file);
dError("node:%s, failed to read %s since invalid json format", pWrapper->name, file);
goto _OVER;
}
for (EDndNodeType ntype = DNODE + 1; ntype < NODE_END; ++ntype) {
snprintf(itemName, sizeof(itemName), "%s_shmid", dmProcName(ntype));
cJSON *shmid = cJSON_GetObjectItem(root, itemName);
if (shmid && shmid->type == cJSON_Number) {
pDnode->wrappers[ntype].procShm.id = shmid->valueint;
}
cJSON *shmid = cJSON_GetObjectItem(root, "shmid");
if (shmid && shmid->type == cJSON_Number) {
pWrapper->procShm.id = shmid->valueint;
}
snprintf(itemName, sizeof(itemName), "%s_shmsize", dmProcName(ntype));
cJSON *shmsize = cJSON_GetObjectItem(root, itemName);
if (shmsize && shmsize->type == cJSON_Number) {
pDnode->wrappers[ntype].procShm.size = shmsize->valueint;
}
cJSON *shmsize = cJSON_GetObjectItem(root, "shmsize");
if (shmsize && shmsize->type == cJSON_Number) {
pWrapper->procShm.size = shmsize->valueint;
}
}
if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_END) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
if (pWrapper->procShm.id >= 0) {
dDebug("shmid:%d, is closed, size:%d", pWrapper->procShm.id, pWrapper->procShm.size);
taosDropShm(&pWrapper->procShm);
}
if (!tsMultiProcess || pWrapper->pDnode->ntype == DNODE || pWrapper->pDnode->ntype == NODE_END) {
if (pWrapper->procShm.id >= 0) {
dDebug("node:%s, shmid:%d, is closed, size:%d", pWrapper->name, pWrapper->procShm.id, pWrapper->procShm.size);
taosDropShm(&pWrapper->procShm);
}
} else {
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
if (taosAttachShm(&pWrapper->procShm) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("shmid:%d, failed to attach shm since %s", pWrapper->procShm.id, terrstr());
@ -197,7 +188,7 @@ int32_t dmReadShmFile(SDnode *pDnode) {
dInfo("node:%s, shmid:%d is attached, size:%d", pWrapper->name, pWrapper->procShm.id, pWrapper->procShm.size);
}
dDebug("successed to load %s", file);
dDebug("node:%s, successed to load %s", pWrapper->name, file);
code = 0;
_OVER:
@ -207,7 +198,7 @@ _OVER:
return code;
}
int32_t dmWriteShmFile(SDnode *pDnode) {
int32_t dmWriteShmFile(SMgmtWrapper *pWrapper) {
int32_t code = -1;
int32_t len = 0;
char content[MAXLEN + 1] = {0};
@ -215,37 +206,30 @@ int32_t dmWriteShmFile(SDnode *pDnode) {
char realfile[PATH_MAX] = {0};
TdFilePtr pFile = NULL;
snprintf(file, sizeof(file), "%s%s.shmfile.bak", pDnode->data.dataDir, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%s.shmfile", pDnode->data.dataDir, TD_DIRSEP);
snprintf(file, sizeof(file), "%s%sshmfile.bak", pWrapper->path, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%sshmfile", pWrapper->path, TD_DIRSEP);
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to open file:%s since %s", file, terrstr());
dError("node:%s, failed to open file:%s since %s", pWrapper->name, file, terrstr());
goto _OVER;
}
len += snprintf(content + len, MAXLEN - len, "{\n");
for (EDndNodeType ntype = DNODE + 1; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\":%d,\n", dmProcName(ntype), pWrapper->procShm.id);
if (ntype == NODE_END - 1) {
len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d\n", dmProcName(ntype), pWrapper->procShm.size);
} else {
len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d,\n", dmProcName(ntype), pWrapper->procShm.size);
}
}
len += snprintf(content + len, MAXLEN - len, " \"shmid\":%d,\n", pWrapper->procShm.id);
len += snprintf(content + len, MAXLEN - len, " \"shmsize\":%d\n", pWrapper->procShm.size);
len += snprintf(content + len, MAXLEN - len, "}\n");
if (taosWriteFile(pFile, content, len) != len) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to write file:%s since %s", file, terrstr());
dError("node:%s, failed to write file:%s since %s", pWrapper->name, file, terrstr());
goto _OVER;
}
if (taosFsyncFile(pFile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to fsync file:%s since %s", file, terrstr());
dError("node:%s, failed to fsync file:%s since %s", pWrapper->name, file, terrstr());
goto _OVER;
}
@ -253,11 +237,11 @@ int32_t dmWriteShmFile(SDnode *pDnode) {
if (taosRenameFile(file, realfile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to rename %s to %s since %s", file, realfile, terrstr());
dError("node:%s, failed to rename %s to %s since %s", pWrapper->name, file, realfile, terrstr());
return -1;
}
dInfo("successed to write %s", realfile);
dInfo("node:%s, successed to write %s", pWrapper->name, realfile);
code = 0;
_OVER:

View File

@ -171,9 +171,3 @@ void dmGetMonitorSysInfo(SMonSysInfo *pInfo) {
taosGetCardInfoDelta(&pInfo->net_in, &pInfo->net_out);
taosGetProcIODelta(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk);
}
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
SMsgCb msgCb = pWrapper->pDnode->data.msgCb;
msgCb.pWrapper = pWrapper;
return msgCb;
}

View File

@ -19,7 +19,7 @@
static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dmReadFile(pWrapper, required); }
static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper);
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;;
pOption->msgCb = msgCb;
}

View File

@ -39,7 +39,7 @@ static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) {
}
static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper);
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue;
msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue;
msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue;
@ -225,11 +225,18 @@ static int32_t mmStart(SMgmtWrapper *pWrapper) {
return mndStart(pMgmt->pMnode);
}
static void mmStop(SMgmtWrapper *pWrapper) {
dDebug("mnode-mgmt start to stop");
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mndStop(pMgmt->pMnode);
}
void mmSetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = mmOpen;
mgmtFp.closeFp = mmClose;
mgmtFp.startFp = mmStart;
mgmtFp.stopFp = mmStop;
mgmtFp.createFp = mmProcessCreateReq;
mgmtFp.dropFp = mmProcessDropReq;
mgmtFp.requiredFp = mmRequire;

View File

@ -19,7 +19,7 @@
static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dmReadFile(pWrapper, required); }
static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper);
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue;
msgCb.qsizeFp = qmGetQueueSize;

View File

@ -19,7 +19,7 @@
static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dmReadFile(pWrapper, required); }
static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) {
SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper);
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
pOption->msgCb = msgCb;
}

View File

@ -143,7 +143,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return -1;
}
SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper);
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;

View File

@ -128,7 +128,7 @@ static void *vmOpenVnodeFunc(void *param) {
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
dmReportStartup(pDnode, "open-vnodes", stepDesc);
SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper);
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;

View File

@ -220,7 +220,6 @@ static int32_t mndInitSteps(SMnode *pMnode, bool deploy) {
if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return -1;
return 0;
}
@ -346,6 +345,8 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
int32_t mndStart(SMnode *pMnode) { return mndInitTimer(pMnode); }
void mndStop(SMnode *pMnode) { return mndCleanupTimer(pMnode); }
int32_t mndProcessMsg(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
SRpcMsg *pRpc = &pMsg->rpcMsg;