diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 08ab63e55a..1d5ed1b6d2 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -68,6 +68,7 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption); * @param pMnode The mnode object. */ int32_t mndStart(SMnode *pMnode); +void mndStop(SMnode *pMnode); /** * @brief Get mnode monitor info. diff --git a/source/dnode/mgmt/implement/inc/dmImp.h b/source/dnode/mgmt/implement/inc/dmImp.h index 61dd7800b1..52a56305fd 100644 --- a/source/dnode/mgmt/implement/inc/dmImp.h +++ b/source/dnode/mgmt/implement/inc/dmImp.h @@ -29,6 +29,7 @@ void dmCloseNode(SMgmtWrapper *pWrapper); int32_t dmInitTrans(SDnode *pDnode); void dmCleanupTrans(SDnode *pDnode); SProcCfg dmGenProcCfg(SMgmtWrapper *pWrapper); +SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper); int32_t dmInitMsgHandle(SDnode *pDnode); void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void dmSendToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp); diff --git a/source/dnode/mgmt/implement/src/dmExec.c b/source/dnode/mgmt/implement/src/dmExec.c index 376742e825..354e2372a4 100644 --- a/source/dnode/mgmt/implement/src/dmExec.c +++ b/source/dnode/mgmt/implement/src/dmExec.c @@ -124,7 +124,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { return dmOpenNodeImp(pWrapper); } else if (pDnode->ptype == DND_PROC_PARENT) { if (dmInitNodeProc(pWrapper) != 0) return -1; - if (dmWriteShmFile(pDnode) != 0) return -1; + if (dmWriteShmFile(pWrapper) != 0) return -1; if (dmRunNodeProc(pWrapper) != 0) return -1; } return 0; @@ -226,11 +226,11 @@ static int32_t dmRunInParentProcess(SDnode *pDnode) { pWrapper->required = dmRequireNode(pWrapper); if (!pWrapper->required) continue; if (dmInitNodeProc(pWrapper) != 0) return -1; - } - if (dmWriteShmFile(pDnode) != 0) { - dError("failed to write runtime file since %s", terrstr()); - return -1; + if (dmWriteShmFile(pWrapper) != 0) { + dError("failed to write runtime file since %s", terrstr()); + return -1; + } } for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) { diff --git a/source/dnode/mgmt/implement/src/dmObj.c b/source/dnode/mgmt/implement/src/dmObj.c index cbe3b29303..9236a1f24b 100644 --- a/source/dnode/mgmt/implement/src/dmObj.c +++ b/source/dnode/mgmt/implement/src/dmObj.c @@ -72,7 +72,7 @@ static void dmClearVars(SDnode *pDnode) { } SDnode *dmCreate(const SDnodeOpt *pOption) { - dDebug("start to create dnode object"); + dDebug("start to create dnode"); int32_t code = -1; char path[PATH_MAX] = {0}; SDnode *pDnode = NULL; @@ -96,20 +96,25 @@ SDnode *dmCreate(const SDnodeOpt *pOption) { smSetMgmtFp(&pDnode->wrappers[SNODE]); bmSetMgmtFp(&pDnode->wrappers[BNODE]); - for (EDndNodeType n = 0; n < NODE_END; ++n) { + for (EDndNodeType n = DNODE; n < NODE_END; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; snprintf(path, sizeof(path), "%s%s%s", pDnode->data.dataDir, TD_DIRSEP, pWrapper->name); pWrapper->path = strdup(path); pWrapper->procShm.id = -1; pWrapper->pDnode = pDnode; pWrapper->ntype = n; + pWrapper->procType = DND_PROC_SINGLE; + taosInitRWLatch(&pWrapper->latch); + if (pWrapper->path == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } - pWrapper->procType = DND_PROC_SINGLE; - taosInitRWLatch(&pWrapper->latch); + if (n != DNODE && dmReadShmFile(pWrapper) != 0) { + dError("node:%s, failed to read shm file since %s", pWrapper->name, terrstr()); + goto _OVER; + } } if (dmInitMsgHandle(pDnode) != 0) { @@ -117,13 +122,8 @@ SDnode *dmCreate(const SDnodeOpt *pOption) { goto _OVER; } - if (dmReadShmFile(pDnode) != 0) { - dError("failed to read shm file since %s", terrstr()); - goto _OVER; - } - - SMsgCb msgCb = dmGetMsgcb(&pDnode->wrappers[0]); - tmsgSetDefaultMsgCb(&msgCb); + pDnode->data.msgCb = dmGetMsgcb(&pDnode->wrappers[DNODE]); + tmsgSetDefaultMsgCb(&pDnode->data.msgCb); dInfo("dnode is created, data:%p", pDnode); code = 0; diff --git a/source/dnode/mgmt/implement/src/dmTransport.c b/source/dnode/mgmt/implement/src/dmTransport.c index d746c44262..d5328cb8f6 100644 --- a/source/dnode/mgmt/implement/src/dmTransport.c +++ b/source/dnode/mgmt/implement/src/dmTransport.c @@ -503,15 +503,6 @@ static void dmCleanupServer(SDnode *pDnode) { int32_t dmInitTrans(SDnode *pDnode) { if (dmInitServer(pDnode) != 0) return -1; if (dmInitClient(pDnode) != 0) return -1; - - SMsgCb msgCb = { - .sendReqFp = dmSendReq, - .sendRspFp = dmSendRsp, - .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, - .releaseHandleFp = dmReleaseHandle, - }; - pDnode->data.msgCb = msgCb; - return 0; } @@ -519,3 +510,14 @@ void dmCleanupTrans(SDnode *pDnode) { dmCleanupServer(pDnode); dmCleanupClient(pDnode); } + +SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) { + SMsgCb msgCb = { + .sendReqFp = dmSendReq, + .sendRspFp = dmSendRsp, + .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, + .releaseHandleFp = dmReleaseHandle, + .pWrapper = pWrapper, + }; + return msgCb; +} \ No newline at end of file diff --git a/source/dnode/mgmt/interface/inc/dmInt.h b/source/dnode/mgmt/interface/inc/dmInt.h index 9e8ac82195..3c8ecdc71b 100644 --- a/source/dnode/mgmt/interface/inc/dmInt.h +++ b/source/dnode/mgmt/interface/inc/dmInt.h @@ -37,14 +37,13 @@ void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgF void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); void dmGetMonitorSysInfo(SMonSysInfo *pInfo); -SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper); // dmFile.c int32_t dmReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); int32_t dmWriteFile(SMgmtWrapper *pWrapper, bool deployed); TdFilePtr dmCheckRunning(const char *dataDir); -int32_t dmReadShmFile(SDnode *pDnode); -int32_t dmWriteShmFile(SDnode *pDnode); +int32_t dmReadShmFile(SMgmtWrapper *pWrapper); +int32_t dmWriteShmFile(SMgmtWrapper *pWrapper); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/interface/src/dmFile.c b/source/dnode/mgmt/interface/src/dmFile.c index c994357e76..3256b8b4b0 100644 --- a/source/dnode/mgmt/interface/src/dmFile.c +++ b/source/dnode/mgmt/interface/src/dmFile.c @@ -140,18 +140,17 @@ TdFilePtr dmCheckRunning(const char *dataDir) { return pFile; } -int32_t dmReadShmFile(SDnode *pDnode) { +int32_t dmReadShmFile(SMgmtWrapper *pWrapper) { int32_t code = -1; - char itemName[24] = {0}; char content[MAXLEN + 1] = {0}; char file[PATH_MAX] = {0}; cJSON *root = NULL; TdFilePtr pFile = NULL; - snprintf(file, sizeof(file), "%s%s.shmfile", pDnode->data.dataDir, TD_DIRSEP); + snprintf(file, sizeof(file), "%s%sshmfile", pWrapper->path, TD_DIRSEP); pFile = taosOpenFile(file, TD_FILE_READ); if (pFile == NULL) { - dDebug("file %s not exist", file); + dDebug("node:%s, file %s not exist", pWrapper->name, file); code = 0; goto _OVER; } @@ -160,35 +159,27 @@ int32_t dmReadShmFile(SDnode *pDnode) { root = cJSON_Parse(content); if (root == NULL) { terrno = TSDB_CODE_INVALID_JSON_FORMAT; - dError("failed to read %s since invalid json format", file); + dError("node:%s, failed to read %s since invalid json format", pWrapper->name, file); goto _OVER; } - for (EDndNodeType ntype = DNODE + 1; ntype < NODE_END; ++ntype) { - snprintf(itemName, sizeof(itemName), "%s_shmid", dmProcName(ntype)); - cJSON *shmid = cJSON_GetObjectItem(root, itemName); - if (shmid && shmid->type == cJSON_Number) { - pDnode->wrappers[ntype].procShm.id = shmid->valueint; - } + cJSON *shmid = cJSON_GetObjectItem(root, "shmid"); + if (shmid && shmid->type == cJSON_Number) { + pWrapper->procShm.id = shmid->valueint; + } - snprintf(itemName, sizeof(itemName), "%s_shmsize", dmProcName(ntype)); - cJSON *shmsize = cJSON_GetObjectItem(root, itemName); - if (shmsize && shmsize->type == cJSON_Number) { - pDnode->wrappers[ntype].procShm.size = shmsize->valueint; - } + cJSON *shmsize = cJSON_GetObjectItem(root, "shmsize"); + if (shmsize && shmsize->type == cJSON_Number) { + pWrapper->procShm.size = shmsize->valueint; } } - if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_END) { - for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - if (pWrapper->procShm.id >= 0) { - dDebug("shmid:%d, is closed, size:%d", pWrapper->procShm.id, pWrapper->procShm.size); - taosDropShm(&pWrapper->procShm); - } + if (!tsMultiProcess || pWrapper->pDnode->ntype == DNODE || pWrapper->pDnode->ntype == NODE_END) { + if (pWrapper->procShm.id >= 0) { + dDebug("node:%s, shmid:%d, is closed, size:%d", pWrapper->name, pWrapper->procShm.id, pWrapper->procShm.size); + taosDropShm(&pWrapper->procShm); } } else { - SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; if (taosAttachShm(&pWrapper->procShm) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); dError("shmid:%d, failed to attach shm since %s", pWrapper->procShm.id, terrstr()); @@ -197,7 +188,7 @@ int32_t dmReadShmFile(SDnode *pDnode) { dInfo("node:%s, shmid:%d is attached, size:%d", pWrapper->name, pWrapper->procShm.id, pWrapper->procShm.size); } - dDebug("successed to load %s", file); + dDebug("node:%s, successed to load %s", pWrapper->name, file); code = 0; _OVER: @@ -207,7 +198,7 @@ _OVER: return code; } -int32_t dmWriteShmFile(SDnode *pDnode) { +int32_t dmWriteShmFile(SMgmtWrapper *pWrapper) { int32_t code = -1; int32_t len = 0; char content[MAXLEN + 1] = {0}; @@ -215,37 +206,30 @@ int32_t dmWriteShmFile(SDnode *pDnode) { char realfile[PATH_MAX] = {0}; TdFilePtr pFile = NULL; - snprintf(file, sizeof(file), "%s%s.shmfile.bak", pDnode->data.dataDir, TD_DIRSEP); - snprintf(realfile, sizeof(realfile), "%s%s.shmfile", pDnode->data.dataDir, TD_DIRSEP); + snprintf(file, sizeof(file), "%s%sshmfile.bak", pWrapper->path, TD_DIRSEP); + snprintf(realfile, sizeof(realfile), "%s%sshmfile", pWrapper->path, TD_DIRSEP); pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to open file:%s since %s", file, terrstr()); + dError("node:%s, failed to open file:%s since %s", pWrapper->name, file, terrstr()); goto _OVER; } len += snprintf(content + len, MAXLEN - len, "{\n"); - for (EDndNodeType ntype = DNODE + 1; ntype < NODE_END; ++ntype) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\":%d,\n", dmProcName(ntype), pWrapper->procShm.id); - if (ntype == NODE_END - 1) { - len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d\n", dmProcName(ntype), pWrapper->procShm.size); - } else { - len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d,\n", dmProcName(ntype), pWrapper->procShm.size); - } - } + len += snprintf(content + len, MAXLEN - len, " \"shmid\":%d,\n", pWrapper->procShm.id); + len += snprintf(content + len, MAXLEN - len, " \"shmsize\":%d\n", pWrapper->procShm.size); len += snprintf(content + len, MAXLEN - len, "}\n"); if (taosWriteFile(pFile, content, len) != len) { terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to write file:%s since %s", file, terrstr()); + dError("node:%s, failed to write file:%s since %s", pWrapper->name, file, terrstr()); goto _OVER; } if (taosFsyncFile(pFile) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to fsync file:%s since %s", file, terrstr()); + dError("node:%s, failed to fsync file:%s since %s", pWrapper->name, file, terrstr()); goto _OVER; } @@ -253,11 +237,11 @@ int32_t dmWriteShmFile(SDnode *pDnode) { if (taosRenameFile(file, realfile) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to rename %s to %s since %s", file, realfile, terrstr()); + dError("node:%s, failed to rename %s to %s since %s", pWrapper->name, file, realfile, terrstr()); return -1; } - dInfo("successed to write %s", realfile); + dInfo("node:%s, successed to write %s", pWrapper->name, realfile); code = 0; _OVER: diff --git a/source/dnode/mgmt/interface/src/dmInt.c b/source/dnode/mgmt/interface/src/dmInt.c index ad5b35a3f8..10599c0043 100644 --- a/source/dnode/mgmt/interface/src/dmInt.c +++ b/source/dnode/mgmt/interface/src/dmInt.c @@ -171,9 +171,3 @@ void dmGetMonitorSysInfo(SMonSysInfo *pInfo) { taosGetCardInfoDelta(&pInfo->net_in, &pInfo->net_out); taosGetProcIODelta(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); } - -SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) { - SMsgCb msgCb = pWrapper->pDnode->data.msgCb; - msgCb.pWrapper = pWrapper; - return msgCb; -} \ No newline at end of file diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmInt.c b/source/dnode/mgmt/mgmt_bnode/src/bmInt.c index ec261d654c..72a7f2cc86 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmInt.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmInt.c @@ -19,7 +19,7 @@ static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dmReadFile(pWrapper, required); } static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { - SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper); + SMsgCb msgCb = pMgmt->pDnode->data.msgCb;; pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 2d8914df25..9e83c317ea 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -39,7 +39,7 @@ static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) { } static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { - SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper); + SMsgCb msgCb = pMgmt->pDnode->data.msgCb; msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue; msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue; msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue; @@ -225,11 +225,18 @@ static int32_t mmStart(SMgmtWrapper *pWrapper) { return mndStart(pMgmt->pMnode); } +static void mmStop(SMgmtWrapper *pWrapper) { + dDebug("mnode-mgmt start to stop"); + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + mndStop(pMgmt->pMnode); +} + void mmSetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = mmOpen; mgmtFp.closeFp = mmClose; mgmtFp.startFp = mmStart; + mgmtFp.stopFp = mmStop; mgmtFp.createFp = mmProcessCreateReq; mgmtFp.dropFp = mmProcessDropReq; mgmtFp.requiredFp = mmRequire; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmInt.c b/source/dnode/mgmt/mgmt_qnode/src/qmInt.c index 76c4f2af0d..53d4cefbff 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmInt.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmInt.c @@ -19,7 +19,7 @@ static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dmReadFile(pWrapper, required); } static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { - SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper); + SMsgCb msgCb = pMgmt->pDnode->data.msgCb; msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue; msgCb.qsizeFp = qmGetQueueSize; diff --git a/source/dnode/mgmt/mgmt_snode/src/smInt.c b/source/dnode/mgmt/mgmt_snode/src/smInt.c index 911c1b6f70..ad7909a8f7 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smInt.c +++ b/source/dnode/mgmt/mgmt_snode/src/smInt.c @@ -19,7 +19,7 @@ static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dmReadFile(pWrapper, required); } static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { - SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper); + SMsgCb msgCb = pMgmt->pDnode->data.msgCb; pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index dfe79c2dde..73397316e0 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -143,7 +143,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper); + SMsgCb msgCb = pMgmt->pDnode->data.msgCb; msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index d70377e9ad..c4e05c54eb 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -128,7 +128,7 @@ static void *vmOpenVnodeFunc(void *param) { pMgmt->state.openVnodes, pMgmt->state.totalVnodes); dmReportStartup(pDnode, "open-vnodes", stepDesc); - SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper); + SMsgCb msgCb = pMgmt->pDnode->data.msgCb; msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 5a1780530c..8b2e77c1dd 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -220,7 +220,6 @@ static int32_t mndInitSteps(SMnode *pMnode, bool deploy) { if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1; if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1; - if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return -1; return 0; } @@ -346,6 +345,8 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { int32_t mndStart(SMnode *pMnode) { return mndInitTimer(pMnode); } +void mndStop(SMnode *pMnode) { return mndCleanupTimer(pMnode); } + int32_t mndProcessMsg(SNodeMsg *pMsg) { SMnode *pMnode = pMsg->pNode; SRpcMsg *pRpc = &pMsg->rpcMsg;