From 13e6efe3fc17b8aa61d0bf8424507ab1df906b15 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 1 Apr 2022 15:29:13 +0800 Subject: [PATCH 01/11] shm --- include/os/osShm.h | 2 +- source/dnode/mgmt/main/src/dndExec.c | 17 ++++------------- source/os/src/osProc.c | 2 +- source/os/src/osShm.c | 4 ++-- 4 files changed, 8 insertions(+), 17 deletions(-) diff --git a/include/os/osShm.h b/include/os/osShm.h index d26a99e277..61ffc0f6cc 100644 --- a/include/os/osShm.h +++ b/include/os/osShm.h @@ -26,7 +26,7 @@ typedef struct { void* ptr; } SShm; -int32_t taosCreateShm(SShm *pShm, int32_t shmsize) ; +int32_t taosCreateShm(SShm *pShm, int32_t key, int32_t shmsize) ; void taosDropShm(SShm *pShm); int32_t taosAttachShm(SShm *pShm); diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index b37893aa6f..0cc429c0d9 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -202,7 +202,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { if (!pWrapper->required) continue; int32_t shmsize = 1024 * 1024 * 2; // size will be a configuration item - if (taosCreateShm(&pWrapper->shm, shmsize) != 0) { + if (taosCreateShm(&pWrapper->shm, n, shmsize) != 0) { terrno = TAOS_SYSTEM_ERROR(terrno); dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr()); return -1; @@ -261,17 +261,8 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { if (pDnode->ntype == NODE_MAX) continue; if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) { - dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId); - taosKillProc(pWrapper->procId); - } - } - - for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - if (!pWrapper->required) continue; - if (pDnode->ntype == NODE_MAX) continue; - - if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) { + // dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId); + // taosKillProc(pWrapper->procId); dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId); taosWaitProc(pWrapper->procId); dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId); @@ -340,7 +331,7 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { dndReportStartup(pDnode, "TDengine", "initialized successfully"); while (1) { if (pDnode->event == DND_EVENT_STOP) { - dInfo("dnode is about to stop"); + dInfo("%s is about to stop", pWrapper->name); break; } taosMsleep(100); diff --git a/source/os/src/osProc.c b/source/os/src/osProc.c index 6b52fa30be..262160c033 100644 --- a/source/os/src/osProc.c +++ b/source/os/src/osProc.c @@ -33,7 +33,7 @@ int32_t taosNewProc(char **args) { } void taosWaitProc(int32_t pid) { - int32_t status = 0; + int32_t status = -1; waitpid(pid, &status, 0); } diff --git a/source/os/src/osShm.c b/source/os/src/osShm.c index ba184c1f5d..bf784f14ac 100644 --- a/source/os/src/osShm.c +++ b/source/os/src/osShm.c @@ -17,10 +17,10 @@ #define _DEFAULT_SOURCE #include "os.h" -int32_t taosCreateShm(SShm* pShm, int32_t shmsize) { +int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) { pShm->id = -1; - int32_t shmid = shmget(0X95279527, shmsize, IPC_CREAT | 0600); + int32_t shmid = shmget(0X95270000 + key, shmsize, IPC_CREAT | 0600); if (shmid < 0) { return -1; } From 1649d0ddda9cf119267df275cb93a66721107630 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 1 Apr 2022 16:36:36 +0800 Subject: [PATCH 02/11] shm --- include/dnode/mnode/mnode.h | 3 +-- source/dnode/mgmt/mm/src/mmFile.c | 4 ++-- source/dnode/mgmt/mm/src/mmInt.c | 15 +++++-------- source/dnode/mnode/impl/inc/mndCluster.h | 1 + source/dnode/mnode/impl/inc/mndInt.h | 1 - source/dnode/mnode/impl/src/mndCluster.c | 19 +++++++++++++++- source/dnode/mnode/impl/src/mndQnode.c | 2 +- source/dnode/mnode/impl/src/mndSnode.c | 2 +- source/dnode/mnode/impl/src/mnode.c | 28 +++++------------------- 9 files changed, 36 insertions(+), 39 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 5b88a9d6af..08ab63e55a 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -29,8 +29,7 @@ extern "C" { typedef struct SMnode SMnode; typedef struct { - int32_t dnodeId; - int64_t clusterId; + bool deploy; int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; diff --git a/source/dnode/mgmt/mm/src/mmFile.c b/source/dnode/mgmt/mm/src/mmFile.c index e5cc0ce087..76aba771cb 100644 --- a/source/dnode/mgmt/mm/src/mmFile.c +++ b/source/dnode/mgmt/mm/src/mmFile.c @@ -111,7 +111,7 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) { TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno);; + terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to write %s since %s", file, terrstr()); return -1; } @@ -145,7 +145,7 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) { snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP); if (taosRenameFile(file, realfile) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno);; + terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to rename %s since %s", file, terrstr()); return -1; } diff --git a/source/dnode/mgmt/mm/src/mmInt.c b/source/dnode/mgmt/mm/src/mmInt.c index 3d15c9b9ae..f4e00b5921 100644 --- a/source/dnode/mgmt/mm/src/mmInt.c +++ b/source/dnode/mgmt/mm/src/mmInt.c @@ -39,10 +39,6 @@ static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) { } static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { - SDnode *pDnode = pMgmt->pDnode; - pOption->dnodeId = pDnode->dnodeId; - pOption->clusterId = pDnode->clusterId; - SMsgCb msgCb = {0}; msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue; @@ -66,6 +62,7 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { pReplica->id = 1; pReplica->port = pDnode->serverPort; tstrncpy(pReplica->fqdn, pDnode->localFqdn, TSDB_FQDN_LEN); + pOption->deploy = true; pMgmt->selfIndex = pOption->selfIndex; pMgmt->replica = pOption->replica; @@ -77,6 +74,7 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { pOption->selfIndex = pMgmt->selfIndex; pOption->replica = pMgmt->replica; memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); + pOption->deploy = false; } static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { @@ -89,7 +87,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre pReplica->id = pCreate->replicas[i].id; pReplica->port = pCreate->replicas[i].port; memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); - if (pReplica->id == pOption->dnodeId) { + if (pReplica->id == pMgmt->pDnode->dnodeId) { pOption->selfIndex = i; } } @@ -98,6 +96,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre dError("failed to build mnode options since %s", terrstr()); return -1; } + pOption->deploy = true; pMgmt->selfIndex = pOption->selfIndex; pMgmt->replica = pOption->replica; @@ -225,9 +224,7 @@ int32_t mmOpenFromMsg(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq) { return code; } -static int32_t mmOpen(SMgmtWrapper *pWrapper) { - return mmOpenFromMsg(pWrapper, NULL); -} +static int32_t mmOpen(SMgmtWrapper *pWrapper) { return mmOpenFromMsg(pWrapper, NULL); } static int32_t mmStart(SMgmtWrapper *pWrapper) { dDebug("mnode-mgmt start to run"); @@ -258,7 +255,7 @@ int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encry } int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, - SMonGrantInfo *pGrantInfo) { + SMonGrantInfo *pGrantInfo) { SMnodeMgmt *pMgmt = pWrapper->pMgmt; return mndGetMonitorInfo(pMgmt->pMnode, pClusterInfo, pVgroupInfo, pGrantInfo); } diff --git a/source/dnode/mnode/impl/inc/mndCluster.h b/source/dnode/mnode/impl/inc/mndCluster.h index 0206695b88..5b7bac4486 100644 --- a/source/dnode/mnode/impl/inc/mndCluster.h +++ b/source/dnode/mnode/impl/inc/mndCluster.h @@ -25,6 +25,7 @@ extern "C" { int32_t mndInitCluster(SMnode *pMnode); void mndCleanupCluster(SMnode *pMnode); int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len); +int64_t mndGetClusterId(SMnode *pMnode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index f89e9d8fe0..1cb0c78aa5 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -100,7 +100,6 @@ typedef struct { } SGrantInfo; typedef struct SMnode { - int32_t dnodeId; int64_t clusterId; int8_t replica; int8_t selfIndex; diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index dde7e1fe8f..94e1efde61 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -17,7 +17,7 @@ #include "mndCluster.h" #include "mndShow.h" -#define TSDB_CLUSTER_VER_NUMBE 1 +#define TSDB_CLUSTER_VER_NUMBE 1 #define TSDB_CLUSTER_RESERVE_SIZE 64 static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster); @@ -61,6 +61,23 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) { return 0; } +int64_t mndGetClusterId(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + int64_t clusterId = -1; + + while (1) { + SClusterObj *pCluster = NULL; + pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pCluster); + if (pIter == NULL) break; + + clusterId = pCluster->id; + sdbRelease(pSdb, pCluster); + } + + return clusterId; +} + static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index 4b19a26bc4..85718e2037 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -423,7 +423,7 @@ static int32_t mndProcessDropQnodeReq(SNodeMsg *pReq) { DROP_QNODE_OVER: if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { - mError("qnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); + mError("qnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); } mndReleaseQnode(pMnode, pObj); diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index 5e0d9fae9a..4f24c6f7eb 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -433,7 +433,7 @@ static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq) { DROP_SNODE_OVER: if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { - mError("snode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); + mError("snode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); } mndReleaseSnode(pMnode, pObj); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 67b7d6dd45..5c3dd778e1 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -187,7 +187,7 @@ static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCle return 0; } -static int32_t mndInitSteps(SMnode *pMnode) { +static int32_t mndInitSteps(SMnode *pMnode, bool deploy) { if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1; if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1; @@ -210,7 +210,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1; - if (pMnode->clusterId <= 0) { + if (deploy) { if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1; } else { if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1; @@ -263,23 +263,15 @@ static int32_t mndExecSteps(SMnode *pMnode) { } } + pMnode->clusterId = mndGetClusterId(pMnode); return 0; } -static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { - pMnode->dnodeId = pOption->dnodeId; - pMnode->clusterId = pOption->clusterId; +static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->replica = pOption->replica; pMnode->selfIndex = pOption->selfIndex; memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); pMnode->msgCb = pOption->msgCb; - - if (pMnode->dnodeId < 0 || pMnode->clusterId < 0) { - terrno = TSDB_CODE_MND_INVALID_OPTIONS; - return -1; - } - - return 0; } SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { @@ -294,6 +286,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { char timestr[24] = "1970-01-01 00:00:00.00"; (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); + mndSetOptions(pMnode, pOption); pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep)); if (pMnode->pSteps == NULL) { @@ -312,16 +305,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { return NULL; } - code = mndSetOptions(pMnode, pOption); - if (code != 0) { - code = terrno; - mError("failed to open mnode since %s", terrstr()); - mndClose(pMnode); - terrno = code; - return NULL; - } - - code = mndInitSteps(pMnode); + code = mndInitSteps(pMnode, pOption->deploy); if (code != 0) { code = terrno; mError("failed to open mnode since %s", terrstr()); From c3917c2544d19e756d113267d2da2d0cb986c340 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 1 Apr 2022 16:46:31 +0800 Subject: [PATCH 03/11] shm --- source/dnode/mgmt/test/vnode/vnode.cpp | 40 -------------------------- source/dnode/mgmt/vm/src/vmMsg.c | 6 ---- 2 files changed, 46 deletions(-) diff --git a/source/dnode/mgmt/test/vnode/vnode.cpp b/source/dnode/mgmt/test/vnode/vnode.cpp index e2de3fc33a..35a98154c2 100644 --- a/source/dnode/mgmt/test/vnode/vnode.cpp +++ b/source/dnode/mgmt/test/vnode/vnode.cpp @@ -70,46 +70,6 @@ TEST_F(DndTestVnode, 01_Create_Vnode) { ASSERT_EQ(pRsp->code, TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED); } } - - { - SCreateVnodeReq createReq = {0}; - createReq.vgId = 2; - createReq.dnodeId = 3; - strcpy(createReq.db, "1.d1"); - createReq.dbUid = 9527; - createReq.vgVersion = 1; - createReq.cacheBlockSize = 16; - createReq.totalBlocks = 10; - createReq.daysPerFile = 10; - createReq.daysToKeep0 = 3650; - createReq.daysToKeep1 = 3650; - createReq.daysToKeep2 = 3650; - createReq.minRows = 100; - createReq.minRows = 4096; - createReq.commitTime = 3600; - createReq.fsyncPeriod = 3000; - createReq.walLevel = 1; - createReq.precision = 0; - createReq.compression = 2; - createReq.replica = 1; - createReq.quorum = 1; - createReq.update = 0; - createReq.cacheLastRow = 0; - createReq.selfIndex = 0; - for (int r = 0; r < createReq.replica; ++r) { - SReplica* pReplica = &createReq.replicas[r]; - pReplica->id = 1; - pReplica->port = 9527; - } - - int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq); - void* pReq = rpcMallocCont(contLen); - tSerializeSCreateVnodeReq(pReq, contLen, &createReq); - - SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_VNODE, pReq, contLen); - ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_VNODE_INVALID_OPTION); - } } TEST_F(DndTestVnode, 02_Alter_Vnode) { diff --git a/source/dnode/mgmt/vm/src/vmMsg.c b/source/dnode/mgmt/vm/src/vmMsg.c index f00bb89354..89cf8dc5da 100644 --- a/source/dnode/mgmt/vm/src/vmMsg.c +++ b/source/dnode/mgmt/vm/src/vmMsg.c @@ -68,12 +68,6 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SWrapperCfg wrapperCfg = {0}; vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg); - if (createReq.dnodeId != pMgmt->pDnode->dnodeId) { - terrno = TSDB_CODE_DND_VNODE_INVALID_OPTION; - dDebug("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); - return -1; - } - SVnodeObj *pVnode = vmAcquireVnode(pMgmt, createReq.vgId); if (pVnode != NULL) { dDebug("vgId:%d, already exist", createReq.vgId); From e1660a127658880ce6ca5687766cfac979b8c4a8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 1 Apr 2022 17:56:57 +0800 Subject: [PATCH 04/11] shm --- source/dnode/mgmt/main/exe/dndMain.c | 5 +++-- source/dnode/mgmt/main/src/dndExec.c | 10 ++++++++-- source/dnode/mgmt/main/src/dndInt.c | 9 --------- source/os/src/osSignal.c | 2 +- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/source/dnode/mgmt/main/exe/dndMain.c b/source/dnode/mgmt/main/exe/dndMain.c index 0d2ddbfbbc..c6a109d62a 100644 --- a/source/dnode/mgmt/main/exe/dndMain.c +++ b/source/dnode/mgmt/main/exe/dndMain.c @@ -30,7 +30,6 @@ static struct { } global = {0}; static void dndStopDnode(int signum, void *info, void *ctx) { - dInfo("system signal:%d received", signum); SDnode *pDnode = atomic_val_compare_exchange_ptr(&global.pDnode, 0, global.pDnode); if (pDnode != NULL) { dndHandleEvent(pDnode, DND_EVENT_STOP); @@ -41,8 +40,10 @@ static void dndSetSignalHandle() { taosSetSignal(SIGTERM, dndStopDnode); taosSetSignal(SIGHUP, dndStopDnode); taosSetSignal(SIGINT, dndStopDnode); + taosSetSignal(SIGTSTP, dndStopDnode); taosSetSignal(SIGABRT, dndStopDnode); taosSetSignal(SIGBREAK, dndStopDnode); + taosSetSignal(SIGQUIT, dndStopDnode); if (!tsMultiProcess) { } else if (global.ntype == DNODE || global.ntype == NODE_MAX) { @@ -72,7 +73,7 @@ static int32_t dndParseArgs(int32_t argc, char const *argv[]) { } else if (strcmp(argv[i], "-n") == 0) { global.ntype = atoi(argv[++i]); if (global.ntype <= DNODE || global.ntype > NODE_MAX) { - printf("'-n' range is [1-5], default is 0\n"); + printf("'-n' range is [1 - %d], default is 0\n", NODE_MAX - 1); return -1; } } else if (strcmp(argv[i], "-k") == 0) { diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index 0cc429c0d9..281bc752ca 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -180,6 +180,7 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { while (1) { if (pDnode->event == DND_EVENT_STOP) { dInfo("dnode is about to stop"); + dndSetStatus(pDnode, DND_STAT_STOPPED); break; } taosMsleep(100); @@ -255,14 +256,16 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { while (1) { if (pDnode->event == DND_EVENT_STOP) { dInfo("dnode is about to stop"); + dndSetStatus(pDnode, DND_STAT_STOPPED); + for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; if (!pWrapper->required) continue; if (pDnode->ntype == NODE_MAX) continue; if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) { - // dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId); - // taosKillProc(pWrapper->procId); + dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId); + taosKillProc(pWrapper->procId); dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId); taosWaitProc(pWrapper->procId); dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId); @@ -332,10 +335,13 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { while (1) { if (pDnode->event == DND_EVENT_STOP) { dInfo("%s is about to stop", pWrapper->name); + dndSetStatus(pDnode, DND_STAT_STOPPED); break; } taosMsleep(100); } + + return 0; } int32_t dndRun(SDnode *pDnode) { diff --git a/source/dnode/mgmt/main/src/dndInt.c b/source/dnode/mgmt/main/src/dndInt.c index 602ebc6b3c..6f6e21b983 100644 --- a/source/dnode/mgmt/main/src/dndInt.c +++ b/source/dnode/mgmt/main/src/dndInt.c @@ -133,14 +133,6 @@ _OVER: void dndClose(SDnode *pDnode) { if (pDnode == NULL) return; - if (dndGetStatus(pDnode) == DND_STAT_STOPPED) { - dError("dnode is shutting down, data:%p", pDnode); - return; - } - - dInfo("start to close dnode, data:%p", pDnode); - dndSetStatus(pDnode, DND_STAT_STOPPED); - for (ENodeType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; dndCloseNode(pWrapper); @@ -151,7 +143,6 @@ void dndClose(SDnode *pDnode) { } void dndHandleEvent(SDnode *pDnode, EDndEvent event) { - dInfo("dnode receive %s event, data:%p", dndEventStr(event), pDnode); if (event == DND_EVENT_STOP) { pDnode->event = event; } diff --git a/source/os/src/osSignal.c b/source/os/src/osSignal.c index d4e6cb3318..1d7fa517e5 100644 --- a/source/os/src/osSignal.c +++ b/source/os/src/osSignal.c @@ -59,7 +59,7 @@ void taosSetSignal(int32_t signum, FSignalHandler sigfp) { struct sigaction act; memset(&act, 0, sizeof(act)); #if 1 - act.sa_flags = SA_SIGINFO; + act.sa_flags = SA_SIGINFO | SA_RESTART; act.sa_sigaction = (FLinuxSignalHandler)sigfp; #else act.sa_handler = sigfp; From 5a2a34f1bd8593ad7d2f6d587c26fa57f610a189 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 1 Apr 2022 19:07:05 +0800 Subject: [PATCH 05/11] add multiprocess mode to CI --- tests/script/jenkins/basic.txt | 7 +++++++ tests/script/sh/deploy.sh | 15 ++++++--------- tests/script/test.sh | 16 ++++++++++++---- tests/tsim/inc/simInt.h | 1 + tests/tsim/src/simExe.c | 31 +++++++++++++++++-------------- tests/tsim/src/simMain.c | 3 +++ 6 files changed, 46 insertions(+), 27 deletions(-) diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 811c9941f7..3a5d782716 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -50,4 +50,11 @@ ./test.sh -f tsim/stable/values.sim ./test.sh -f tsim/stable/vnode3.sim + +# --- for multi process mode +./test.sh -f tsim/user/basic1.sim -m +./test.sh -f tsim/insert/basic1.sim -m +./test.sh -f tsim/stable/vnode3.sim -m +./test.sh -f tsim/tmq/basic.sim -m + #======================b1-end=============== diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 1bbfccf989..38b6d9aadb 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -5,18 +5,12 @@ set +e echo "Executing deploy.sh" -if [ $# != 4 ]; then - echo "argument list need input : " - echo " -n nodeName" - echo " -i nodePort" - exit 1 -fi - UNAME_BIN=`which uname` OS_TYPE=`$UNAME_BIN` NODE_NAME= NODE= -while getopts "n:i:" arg +MULTIPROCESS=0 +while getopts "n:i:m" arg do case $arg in n) @@ -25,6 +19,9 @@ do i) NODE=$OPTARG ;; + m) + MULTIPROCESS=1 + ;; ?) echo "unkonw argument" ;; @@ -145,5 +142,5 @@ echo "statusInterval 1" >> $TAOS_CFG echo "asyncLog 0" >> $TAOS_CFG echo "locale en_US.UTF-8" >> $TAOS_CFG echo "telemetryReporting 0" >> $TAOS_CFG -echo "multiProcess 0" >> $TAOS_CFG +echo "multiProcess ${MULTIPROCESS}" >> $TAOS_CFG echo " " >> $TAOS_CFG diff --git a/tests/script/test.sh b/tests/script/test.sh index f89b9fb1a2..efd193d256 100755 --- a/tests/script/test.sh +++ b/tests/script/test.sh @@ -7,7 +7,6 @@ ################################################## set +e -#set -x FILE_NAME= RELEASE=0 @@ -16,7 +15,8 @@ VALGRIND=0 UNIQUE=0 UNAME_BIN=`which uname` OS_TYPE=`$UNAME_BIN` -while getopts "f:avu" arg +MULTIPROCESS=1 +while getopts "f:avum" arg do case $arg in f) @@ -28,6 +28,9 @@ do u) UNIQUE=1 ;; + m) + MULTIPROCESS=1 + ;; ?) echo "unknow argument" ;; @@ -125,8 +128,13 @@ if [ -n "$FILE_NAME" ]; then echo valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${CODE_DIR}/../script/valgrind.log $PROGRAM -c $CFG_DIR -f $FILE_NAME valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${CODE_DIR}/../script/valgrind.log $PROGRAM -c $CFG_DIR -f $FILE_NAME else - echo "ExcuteCmd:" $PROGRAM -c $CFG_DIR -f $FILE_NAME - $PROGRAM -c $CFG_DIR -f $FILE_NAME + if [[ $MULTIPROCESS -eq 1 ]];then + echo "ExcuteCmd(multiprocess):" $PROGRAM -m -c $CFG_DIR -f $FILE_NAME + $PROGRAM -m -c $CFG_DIR -f $FILE_NAME + else + echo "ExcuteCmd(singleprocess):" $PROGRAM -c $CFG_DIR -f $FILE_NAME + $PROGRAM -c $CFG_DIR -f $FILE_NAME + fi fi else echo "ExcuteCmd:" $PROGRAM -c $CFG_DIR -f basicSuite.sim diff --git a/tests/tsim/inc/simInt.h b/tests/tsim/inc/simInt.h index 1e2190e308..a667139de1 100644 --- a/tests/tsim/inc/simInt.h +++ b/tests/tsim/inc/simInt.h @@ -155,6 +155,7 @@ extern int32_t simScriptSucced; extern int32_t simDebugFlag; extern char simScriptDir[]; extern bool abortExecution; +extern bool useMultiProcess; SScript *simParseScript(char *fileName); SScript *simProcessCallOver(SScript *script); diff --git a/tests/tsim/src/simExe.c b/tests/tsim/src/simExe.c index 855705e904..9a0b48197a 100644 --- a/tests/tsim/src/simExe.c +++ b/tests/tsim/src/simExe.c @@ -305,25 +305,24 @@ bool simExecuteRunBackCmd(SScript *script, char *option) { return true; } -void simReplaceShToBat(char *dst) { - char *sh = strstr(dst, ".sh"); - if (sh != NULL) { +void simReplaceStr(char *buf, char *src, char *dst) { + char *begin = strstr(buf, src); + if (begin != NULL) { + int32_t srcLen = (int32_t)strlen(src); int32_t dstLen = (int32_t)strlen(dst); - char *end = dst + dstLen; - *(end + 1) = 0; + int32_t interval = (dstLen - srcLen); + int32_t remainLen = (int32_t)strlen(buf); + char *end = buf + remainLen; + *(end + interval) = 0; - for (char *p = end; p >= sh; p--) { - *(p + 1) = *p; + for (char *p = end; p >= begin; p--) { + *(p + interval) = *p; } - sh[0] = '.'; - sh[1] = 'b'; - sh[2] = 'a'; - sh[3] = 't'; - sh[4] = ' '; + memcpy(begin, dst, dstLen); } - simDebug("system cmd is %s", dst); + simInfo("system cmd is %s", buf); } bool simExecuteSystemCmd(SScript *script, char *option) { @@ -334,9 +333,13 @@ bool simExecuteSystemCmd(SScript *script, char *option) { simVisuallizeOption(script, option, buf + strlen(buf)); #else sprintf(buf, "%s%s", simScriptDir, option); - simReplaceShToBat(buf); + simReplaceStr(buf, ".sh", ".bat"); #endif + if (useMultiProcess) { + simReplaceStr(buf, "deploy.sh", "deploy.sh -m"); + } + simLogSql(buf, true); int32_t code = system(buf); int32_t repeatTimes = 0; diff --git a/tests/tsim/src/simMain.c b/tests/tsim/src/simMain.c index b57299e752..8898f1b201 100644 --- a/tests/tsim/src/simMain.c +++ b/tests/tsim/src/simMain.c @@ -18,6 +18,7 @@ bool simExecSuccess = false; bool abortExecution = false; +bool useMultiProcess = false; void simHandleSignal(int32_t signo, void *sigInfo, void *context) { simSystemCleanUp(); @@ -32,6 +33,8 @@ int32_t main(int32_t argc, char *argv[]) { tstrncpy(configDir, argv[++i], 128); } else if (strcmp(argv[i], "-f") == 0 && i < argc - 1) { strcpy(scriptFile, argv[++i]); + } else if (strcmp(argv[i], "-m") == 0) { + useMultiProcess = true; } else { printf("usage: %s [options] \n", argv[0]); printf(" [-c config]: config directory, default is: %s\n", configDir); From 420390c0ef5d85d72921f88c0f79e8b66c590016 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 1 Apr 2022 19:13:28 +0800 Subject: [PATCH 06/11] minor changes --- tests/script/test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/test.sh b/tests/script/test.sh index efd193d256..8b77575e18 100755 --- a/tests/script/test.sh +++ b/tests/script/test.sh @@ -15,7 +15,7 @@ VALGRIND=0 UNIQUE=0 UNAME_BIN=`which uname` OS_TYPE=`$UNAME_BIN` -MULTIPROCESS=1 +MULTIPROCESS=0 while getopts "f:avum" arg do case $arg in From 8176ff04d14c521450be575f4b3742d28b39ad84 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 1 Apr 2022 19:43:29 +0800 Subject: [PATCH 07/11] minor changes --- source/dnode/mgmt/main/src/dndExec.c | 4 ++-- tests/script/jenkins/basic.txt | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index 281bc752ca..54cadb177a 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -45,7 +45,7 @@ int32_t dndOpenNode(SMgmtWrapper *pWrapper) { } void dndCloseNode(SMgmtWrapper *pWrapper) { - dDebug("node:%s, start to close", pWrapper->name); + dDebug("node:%s, mgmt start to close", pWrapper->name); pWrapper->required = false; taosWLockLatch(&pWrapper->latch); if (pWrapper->deployed) { @@ -62,7 +62,7 @@ void dndCloseNode(SMgmtWrapper *pWrapper) { taosProcCleanup(pWrapper->pProc); pWrapper->pProc = NULL; } - dDebug("node:%s, has been closed", pWrapper->name); + dDebug("node:%s, mgmt has been closed", pWrapper->name); } static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index d2f19d766f..fc87776658 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -54,7 +54,6 @@ # --- for multi process mode ./test.sh -f tsim/user/basic1.sim -m -./test.sh -f tsim/insert/basic1.sim -m ./test.sh -f tsim/stable/vnode3.sim -m ./test.sh -f tsim/tmq/basic.sim -m From 44311425db094fba75c0b7ad6ad83b8d42f45931 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 1 Apr 2022 19:52:14 +0800 Subject: [PATCH 08/11] multi process --- include/common/tmsgcb.h | 2 -- source/common/src/tmsgcb.c | 4 ---- source/dnode/mgmt/bm/src/bmInt.c | 1 - source/dnode/mgmt/main/src/dndTransport.c | 1 - source/dnode/mgmt/mm/src/mmInt.c | 7 +------ source/dnode/mgmt/qm/src/qmInt.c | 7 +------ source/dnode/mgmt/sm/src/smInt.c | 7 +------ source/dnode/mgmt/vm/src/vmInt.c | 6 +----- source/dnode/mgmt/vm/src/vmMsg.c | 6 +----- source/dnode/mnode/impl/src/mndProfile.c | 6 ------ 10 files changed, 5 insertions(+), 42 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index cb59599d9a..6c3671a8d6 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -50,7 +50,6 @@ typedef struct { PutToQueueFp queueFps[QUEUE_MAX]; GetQueueSizeFp qsizeFp; SendReqFp sendReqFp; - SendMnodeReqFp sendMnodeReqFp; SendRspFp sendRspFp; RegisterBrokenLinkArgFp registerBrokenLinkArgFp; ReleaseHandleFp releaseHandleFp; @@ -60,7 +59,6 @@ void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb); int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq); -int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); void tmsgSendRsp(const SRpcMsg* pRsp); void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); void tmsgReleaseHandle(void* handle, int8_t type); diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index e90634a604..cb5e2b07c1 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -32,10 +32,6 @@ int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) { return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq); } -int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) { - return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq); -} - void tmsgSendRsp(const SRpcMsg* pRsp) { return (*tsDefaultMsgCb.sendRspFp)(tsDefaultMsgCb.pWrapper, pRsp); } void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { diff --git a/source/dnode/mgmt/bm/src/bmInt.c b/source/dnode/mgmt/bm/src/bmInt.c index 4b87f4463c..cbaf247a5b 100644 --- a/source/dnode/mgmt/bm/src/bmInt.c +++ b/source/dnode/mgmt/bm/src/bmInt.c @@ -22,7 +22,6 @@ static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { SMsgCb msgCb = {0}; msgCb.pWrapper = pMgmt->pWrapper; msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; pOption->msgCb = msgCb; diff --git a/source/dnode/mgmt/main/src/dndTransport.c b/source/dnode/mgmt/main/src/dndTransport.c index 07ea0309a8..a87937bc8d 100644 --- a/source/dnode/mgmt/main/src/dndTransport.c +++ b/source/dnode/mgmt/main/src/dndTransport.c @@ -398,7 +398,6 @@ SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) { .pWrapper = pWrapper, .registerBrokenLinkArgFp = dndRegisterBrokenLinkArg, .releaseHandleFp = dndReleaseHandle, - .sendMnodeReqFp = dndSendReqToMnode, .sendReqFp = dndSendReqToDnode, .sendRspFp = dndSendRsp, }; diff --git a/source/dnode/mgmt/mm/src/mmInt.c b/source/dnode/mgmt/mm/src/mmInt.c index f4e00b5921..301ca598e6 100644 --- a/source/dnode/mgmt/mm/src/mmInt.c +++ b/source/dnode/mgmt/mm/src/mmInt.c @@ -39,16 +39,11 @@ static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) { } static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { - SMsgCb msgCb = {0}; - msgCb.pWrapper = pMgmt->pWrapper; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue; msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue; msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue; msgCb.queueFps[SYNC_QUEUE] = mmPutMsgToWriteQueue; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/qm/src/qmInt.c b/source/dnode/mgmt/qm/src/qmInt.c index 11c80a2904..8079859b96 100644 --- a/source/dnode/mgmt/qm/src/qmInt.c +++ b/source/dnode/mgmt/qm/src/qmInt.c @@ -19,15 +19,10 @@ static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { - SMsgCb msgCb = {0}; - msgCb.pWrapper = pMgmt->pWrapper; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue; msgCb.qsizeFp = qmGetQueueSize; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/sm/src/smInt.c b/source/dnode/mgmt/sm/src/smInt.c index a639fc76bb..6d2e2aaefd 100644 --- a/source/dnode/mgmt/sm/src/smInt.c +++ b/source/dnode/mgmt/sm/src/smInt.c @@ -19,12 +19,7 @@ static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { - SMsgCb msgCb = {0}; - msgCb.pWrapper = pMgmt->pWrapper; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/vm/src/vmInt.c b/source/dnode/mgmt/vm/src/vmInt.c index b52c6253dc..d9ef7b5ae6 100644 --- a/source/dnode/mgmt/vm/src/vmInt.c +++ b/source/dnode/mgmt/vm/src/vmInt.c @@ -128,16 +128,12 @@ static void *vmOpenVnodeFunc(void *param) { pMgmt->state.openVnodes, pMgmt->state.totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - SMsgCb msgCb = {0}; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.qsizeFp = vmGetQueueSize; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); if (pImpl == NULL) { diff --git a/source/dnode/mgmt/vm/src/vmMsg.c b/source/dnode/mgmt/vm/src/vmMsg.c index 89cf8dc5da..c3ad74d246 100644 --- a/source/dnode/mgmt/vm/src/vmMsg.c +++ b/source/dnode/mgmt/vm/src/vmMsg.c @@ -76,16 +76,12 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - SMsgCb msgCb = {0}; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.qsizeFp = vmGetQueueSize; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; vnodeCfg.msgCb = msgCb; vnodeCfg.pTfs = pMgmt->pTfs; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 6c38d8626c..cad89399a3 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -433,12 +433,6 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { pHeartbeat->connId = htonl(pHeartbeat->connId); pHeartbeat->pid = htonl(pHeartbeat->pid); - SRpcConnInfo info = {0}; - if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) { - mError("user:%s, connId:%d failed to process hb since %s", pReq->user, pHeartbeat->connId, terrstr()); - return -1; - } - SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId); if (pConn == NULL) { pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0); From 5015a6a78b252bbe523e5cd45c8a3e12f2a5df45 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 1 Apr 2022 21:27:47 +0800 Subject: [PATCH 09/11] shm --- include/util/tprocess.h | 4 +- source/dnode/mgmt/bm/src/bmInt.c | 6 +-- source/dnode/mgmt/main/inc/dnd.h | 2 +- source/dnode/mgmt/main/src/dndExec.c | 13 +++-- source/dnode/mgmt/main/src/dndTransport.c | 58 ++++++++++++----------- source/util/src/tprocess.c | 8 ++-- 6 files changed, 49 insertions(+), 42 deletions(-) diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 3a47450eec..bc0c8fe5a4 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -22,7 +22,7 @@ extern "C" { #endif -typedef enum { PROC_REQ, PROC_RSP, PROC_REG, PROC_RELEASE } ProcFuncType; +typedef enum { PROC_QUEUE, PROC_REQ, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType; typedef struct SProcQueue SProcQueue; typedef struct SProcObj SProcObj; @@ -53,7 +53,7 @@ void taosProcCleanup(SProcObj *pProc); int32_t taosProcRun(SProcObj *pProc); int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, ProcFuncType ftype); -int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, +void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, ProcFuncType ftype); #ifdef __cplusplus diff --git a/source/dnode/mgmt/bm/src/bmInt.c b/source/dnode/mgmt/bm/src/bmInt.c index cbaf247a5b..4ca7afbb6d 100644 --- a/source/dnode/mgmt/bm/src/bmInt.c +++ b/source/dnode/mgmt/bm/src/bmInt.c @@ -19,11 +19,7 @@ static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { - SMsgCb msgCb = {0}; - msgCb.pWrapper = pMgmt->pWrapper; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/main/inc/dnd.h b/source/dnode/mgmt/main/inc/dnd.h index b9c760c980..255a739add 100644 --- a/source/dnode/mgmt/main/inc/dnd.h +++ b/source/dnode/mgmt/main/inc/dnd.h @@ -149,7 +149,7 @@ int32_t dndInitClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index 54cadb177a..8837da6d48 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -90,10 +90,10 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, ProcFuncType ftype) { pMsg->pCont = pCont; - dTrace("msg:%p, get from parent queue, handle:%p app:%p", pMsg, pMsg->handle, pMsg->ahandle); + dTrace("msg:%p, get from parent queue, ftype:%d handle:%p, app:%p", pMsg, ftype, pMsg->handle, pMsg->ahandle); switch (ftype) { - case PROC_REG: + case PROC_REGIST: rpcRegisterBrokenLinkArg(pMsg); break; case PROC_RELEASE: @@ -101,11 +101,14 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t rpcFreeCont(pCont); break; case PROC_REQ: - // todo send to dnode dndSendReqToMnode(pWrapper, pMsg); - default: + // dndSendReq(pWrapper, (const SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg); + break; + case PROC_RSP: dndSendRpcRsp(pWrapper, pMsg); break; + default: + break; } taosMemoryFree(pMsg); } @@ -325,6 +328,8 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { } } + dndSetStatus(pDnode, DND_STAT_RUNNING); + if (taosProcRun(pWrapper->pProc) != 0) { dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); return -1; diff --git a/source/dnode/mgmt/main/src/dndTransport.c b/source/dnode/mgmt/main/src/dndTransport.c index a87937bc8d..d463f2ba7a 100644 --- a/source/dnode/mgmt/main/src/dndTransport.c +++ b/source/dnode/mgmt/main/src/dndTransport.c @@ -319,22 +319,6 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *p return 0; } -int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { - if (pWrapper->procType != PROC_CHILD) { - SDnode *pDnode = pWrapper->pDnode; - if (dndGetStatus(pDnode) != DND_STAT_RUNNING) { - terrno = TSDB_CODE_DND_OFFLINE; - dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle); - return -1; - } - return dndSendRpcReq(&pDnode->trans, pEpSet, pReq); - } else { - while (taosProcPutToParentQ(pWrapper->pProc, pReq, sizeof(SRpcMsg), pReq->pCont, pReq->contLen, PROC_REQ) != 0) { - taosMsleep(1); - } - } -} - int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { SDnode *pDnode = pWrapper->pDnode; STransMgmt *pTrans = &pDnode->trans; @@ -362,13 +346,37 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { } } +int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { + SDnode *pDnode = pWrapper->pDnode; + if (dndGetStatus(pDnode) != DND_STAT_RUNNING) { + terrno = TSDB_CODE_DND_OFFLINE; + dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle); + return -1; + } + + if (pWrapper->procType != PROC_CHILD) { + return dndSendRpcReq(&pDnode->trans, pEpSet, pReq); + } else { + int32_t headLen = sizeof(SRpcMsg) + sizeof(SEpSet); + char *pHead = taosMemoryMalloc(headLen); + if (pHead == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + memcpy(pHead, pReq, sizeof(SRpcMsg)); + memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet)); + + taosProcPutToParentQ(pWrapper->pProc, pReq, headLen, pReq->pCont, pReq->contLen, PROC_REQ); + taosMemoryFree(pHead); + return 0; + } +} + void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { if (pWrapper->procType != PROC_CHILD) { dndSendRpcRsp(pWrapper, pRsp); } else { - while (taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP) != 0) { - taosMsleep(1); - } + taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP); } } @@ -376,9 +384,7 @@ void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { if (pWrapper->procType != PROC_CHILD) { rpcRegisterBrokenLinkArg(pMsg); } else { - while (taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REG) != 0) { - taosMsleep(1); - } + taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGIST); } } @@ -387,19 +393,17 @@ static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) rpcReleaseHandle(handle, type); } else { SRpcMsg msg = {.handle = handle, .code = type}; - while (taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE) != 0) { - taosMsleep(1); - } + taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE); } } SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) { SMsgCb msgCb = { .pWrapper = pWrapper, + .sendReqFp = dndSendReq, + .sendRspFp = dndSendRsp, .registerBrokenLinkArgFp = dndRegisterBrokenLinkArg, .releaseHandleFp = dndReleaseHandle, - .sendReqFp = dndSendReqToDnode, - .sendRspFp = dndSendRsp, }; return msgCb; } \ No newline at end of file diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 1d41bd4a48..600413068c 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -434,7 +434,9 @@ int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, ftype); } -int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - ProcFuncType ftype) { - return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype); +void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, + ProcFuncType ftype) { + while (taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype) != 0) { + taosMsleep(1); + } } From 8e8728a06924959ebc5a7034342c892708098835 Mon Sep 17 00:00:00 2001 From: Shengliang Date: Sat, 2 Apr 2022 09:47:55 +0800 Subject: [PATCH 10/11] [sync cases] --- tests/script/jenkins/basic.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index fc87776658..d8fbfce55f 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -40,7 +40,8 @@ # ---- tmq ./test.sh -f tsim/tmq/basic.sim ./test.sh -f tsim/tmq/basic1.sim -./test.sh -f tsim/tmq/oneTopic.sim +#./test.sh -f tsim/tmq/oneTopic.sim +#./test.sh -f tsim/tmq/multiTopic.sim # --- stable ./test.sh -f tsim/stable/disk.sim From ed2fa5a8fad88338f544f389b069b5e711b17bbc Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 2 Apr 2022 11:21:27 +0800 Subject: [PATCH 11/11] fix uv crash --- source/os/src/osProc.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/os/src/osProc.c b/source/os/src/osProc.c index 262160c033..1cdd41ad78 100644 --- a/source/os/src/osProc.c +++ b/source/os/src/osProc.c @@ -23,9 +23,9 @@ int32_t taosNewProc(char **args) { int32_t pid = fork(); if (pid == 0) { args[0] = tsProcPath; - close(STDIN_FILENO); + // close(STDIN_FILENO); close(STDOUT_FILENO); - close(STDERR_FILENO); + // close(STDERR_FILENO); return execvp(tsProcPath, args); } else { return pid;