From aef124af3ee2a6ff5541975c7ec7cabef5dc2308 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 20 Dec 2021 10:05:10 +0800 Subject: [PATCH] TD-10431 testing for create/drop vnode msg --- source/dnode/mgmt/impl/src/dndVnodes.c | 74 ++++++---- source/dnode/mgmt/impl/test/sut/deploy.cpp | 2 +- source/dnode/mgmt/impl/test/vgroup/vgroup.cpp | 139 +++++++++++++----- 3 files changed, 150 insertions(+), 65 deletions(-) diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 024df14202..8aa41544c4 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -87,8 +87,8 @@ static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnode static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId); static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndCreateVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl); -static void dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode); +static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl); +static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode); static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes); static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); static int32_t dndWriteVnodesToFile(SDnode *pDnode); @@ -136,7 +136,7 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) { dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); } -static int32_t dndCreateVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) { +static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); if (pVnode == NULL) { @@ -189,7 +189,7 @@ static int32_t dndCreateVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) return code; } -static void dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode) { +static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; taosWLockLatch(&pMgmt->latch); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); @@ -297,26 +297,26 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_ SWrapperCfg *pCfg = &pCfgs[i]; cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId"); - if (!vgId || vgId->type != cJSON_String) { + if (!vgId || vgId->type != cJSON_Number) { dError("failed to read %s since vgId not found", file); goto PRASE_VNODE_OVER; } - pCfg->vgId = atoi(vgId->valuestring); + pCfg->vgId = vgId->valueint; snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCfg->vgId); cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped"); - if (!dropped || dropped->type != cJSON_String) { + if (!dropped || dropped->type != cJSON_Number) { dError("failed to read %s since dropped not found", file); goto PRASE_VNODE_OVER; } - pCfg->dropped = atoi(dropped->valuestring); + pCfg->dropped = dropped->valueint; cJSON *vgVersion = cJSON_GetObjectItem(vnode, "vgVersion"); - if (!vgVersion || vgVersion->type != cJSON_String) { + if (!vgVersion || vgVersion->type != cJSON_Number) { dError("failed to read %s since vgVersion not found", file); goto PRASE_VNODE_OVER; } - pCfg->vgVersion = atoi(vgVersion->valuestring); + pCfg->vgVersion = vgVersion->valueint; cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid"); if (!dbUid || dbUid->type != cJSON_String) { @@ -358,28 +358,30 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) { dError("failed to write %s since %s", file, terrstr()); return -1; } - - int32_t len = 0; - int32_t maxLen = 30000; - char *content = calloc(1, maxLen + 1); int32_t numOfVnodes = 0; SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes); + int32_t len = 0; + int32_t maxLen = 65536; + char *content = calloc(1, maxLen + 1); + len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"vnodes\": [{\n"); + len += snprintf(content + len, maxLen - len, " \"vnodes\": [\n"); for (int32_t i = 0; i < numOfVnodes; ++i) { SVnodeObj *pVnode = pVnodes[i]; - len += snprintf(content + len, maxLen - len, " \"vgId\": \"%d\",\n", pVnode->vgId); - len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pVnode->dropped); - len += snprintf(content + len, maxLen - len, " \"vgVersion\": \"%d\",\n", pVnode->vgVersion); - len += snprintf(content + len, maxLen - len, " \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid); - len += snprintf(content + len, maxLen - len, " \"db\": \"%s\"\n", pVnode->db); + len += snprintf(content + len, maxLen - len, " {\n"); + len += snprintf(content + len, maxLen - len, " \"vgId\": %d,\n", pVnode->vgId); + len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pVnode->dropped); + len += snprintf(content + len, maxLen - len, " \"vgVersion\": %d,\n", pVnode->vgVersion); + len += snprintf(content + len, maxLen - len, " \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid); + len += snprintf(content + len, maxLen - len, " \"db\": \"%s\"\n", pVnode->db); if (i < numOfVnodes - 1) { - len += snprintf(content + len, maxLen - len, " },{\n"); + len += snprintf(content + len, maxLen - len, " },\n"); } else { - len += snprintf(content + len, maxLen - len, " }]\n"); + len += snprintf(content + len, maxLen - len, " }\n"); } } + len += snprintf(content + len, maxLen - len, " ]\n"); len += snprintf(content + len, maxLen - len, "}\n"); fwrite(content, 1, len, fp); @@ -422,7 +424,7 @@ static void *dnodeOpenVnodeFunc(void *param) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; } else { - dndCreateVnode(pDnode, pCfg, pImpl); + dndOpenVnode(pDnode, pCfg, pImpl); dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->opened++; } @@ -507,7 +509,8 @@ static void dndCloseVnodes(SDnode *pDnode) { SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes); for (int32_t i = 0; i < numOfVnodes; ++i) { - dndDropVnode(pDnode, pVnodes[i]); + dndReleaseVnode(pDnode, pVnodes[i]); + dndCloseVnode(pDnode, pVnodes[i]); } if (pVnodes != NULL) { @@ -613,7 +616,7 @@ static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return -1; } - int32_t code = dndCreateVnode(pDnode, &wrapperCfg, pImpl); + int32_t code = dndOpenVnode(pDnode, &wrapperCfg, pImpl); if (code != 0) { vnodeClose(pImpl); vnodeDestroy(wrapperCfg.path); @@ -639,20 +642,36 @@ static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SVnodeCfg vnodeCfg = {0}; dndGenerateVnodeCfg(pAlter, &vnodeCfg); + SWrapperCfg wrapperCfg = {0}; + dndGenerateWrapperCfg(pDnode, pAlter, &wrapperCfg); + SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId); if (pVnode == NULL) { dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr()); return terrno; } + if (wrapperCfg.vgVersion == pVnode->vgVersion) { + dndReleaseVnode(pDnode, pVnode); + dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId); + return 0; + } + if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) { dError("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr()); dndReleaseVnode(pDnode, pVnode); return terrno; } + int32_t oldVersion = pVnode->vgVersion; + pVnode->vgVersion = wrapperCfg.vgVersion; + int32_t code = dndWriteVnodesToFile(pDnode); + if (code != 0) { + pVnode->vgVersion = oldVersion; + } + dndReleaseVnode(pDnode, pVnode); - return 0; + return code; } static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { @@ -673,7 +692,8 @@ static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return terrno; } - dndDropVnode(pDnode, pVnode); + dndReleaseVnode(pDnode, pVnode); + dndCloseVnode(pDnode, pVnode); vnodeClose(pVnode->pImpl); vnodeDestroy(pVnode->path); dndWriteVnodesToFile(pDnode); diff --git a/source/dnode/mgmt/impl/test/sut/deploy.cpp b/source/dnode/mgmt/impl/test/sut/deploy.cpp index f2010b5813..7cf469f8e2 100644 --- a/source/dnode/mgmt/impl/test/sut/deploy.cpp +++ b/source/dnode/mgmt/impl/test/sut/deploy.cpp @@ -16,7 +16,7 @@ #include "deploy.h" void initLog(const char* path) { - dDebugFlag = 143; + dDebugFlag = 207; vDebugFlag = 0; mDebugFlag = 207; cDebugFlag = 0; diff --git a/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp b/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp index 1465e069aa..9ce69ce3be 100644 --- a/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp +++ b/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp @@ -178,45 +178,110 @@ int32_t DndTestVgroup::connId; TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { { - SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(sizeof(SCreateVnodeMsg)); - pReq->vgId = htonl(2); - pReq->dnodeId = htonl(1); - strcpy(pReq->db, "1.d1"); - pReq->dbUid = htobe64(9527); - pReq->vgVersion = htonl(1); - pReq->cacheBlockSize = htonl(16); - pReq->totalBlocks = htonl(10); - pReq->daysPerFile = htonl(10); - pReq->daysToKeep0 = htonl(3650); - pReq->daysToKeep1 = htonl(3650); - pReq->daysToKeep2 = htonl(3650); - pReq->minRows = htonl(100); - pReq->minRows = htonl(4096); - pReq->commitTime = htonl(3600); - pReq->fsyncPeriod = htonl(3000); - pReq->walLevel = 1; - pReq->precision = 0; - pReq->compression = 2; - pReq->replica = 1; - pReq->quorum = 1; - pReq->update = 0; - pReq->cacheLastRow = 0; - pReq->selfIndex = 0; - for (int r = 0; r < pReq->replica; ++r) { - SReplica* pReplica = &pReq->replicas[r]; - pReplica->id = htonl(1); - pReplica->port = htons(9150); + for (int i = 0; i < 3; ++i) { + SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(sizeof(SCreateVnodeMsg)); + pReq->vgId = htonl(2); + pReq->dnodeId = htonl(1); + strcpy(pReq->db, "1.d1"); + pReq->dbUid = htobe64(9527); + pReq->vgVersion = htonl(1); + pReq->cacheBlockSize = htonl(16); + pReq->totalBlocks = htonl(10); + pReq->daysPerFile = htonl(10); + pReq->daysToKeep0 = htonl(3650); + pReq->daysToKeep1 = htonl(3650); + pReq->daysToKeep2 = htonl(3650); + pReq->minRows = htonl(100); + pReq->minRows = htonl(4096); + pReq->commitTime = htonl(3600); + pReq->fsyncPeriod = htonl(3000); + pReq->walLevel = 1; + pReq->precision = 0; + pReq->compression = 2; + pReq->replica = 1; + pReq->quorum = 1; + pReq->update = 0; + pReq->cacheLastRow = 0; + pReq->selfIndex = 0; + for (int r = 0; r < pReq->replica; ++r) { + SReplica* pReplica = &pReq->replicas[r]; + pReplica->id = htonl(1); + pReplica->port = htons(9150); + } + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SCreateVnodeMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); } + } - SRpcMsg rpcMsg = {0}; - rpcMsg.pCont = pReq; - rpcMsg.contLen = sizeof(SCreateVnodeMsg); - rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN; + { + for (int i = 0; i < 3; ++i) { + SAlterVnodeMsg* pReq = (SAlterVnodeMsg*)rpcMallocCont(sizeof(SAlterVnodeMsg)); + pReq->vgId = htonl(2); + pReq->dnodeId = htonl(1); + strcpy(pReq->db, "1.d1"); + pReq->dbUid = htobe64(9527); + pReq->vgVersion = htonl(2); + pReq->cacheBlockSize = htonl(16); + pReq->totalBlocks = htonl(10); + pReq->daysPerFile = htonl(10); + pReq->daysToKeep0 = htonl(3650); + pReq->daysToKeep1 = htonl(3650); + pReq->daysToKeep2 = htonl(3650); + pReq->minRows = htonl(100); + pReq->minRows = htonl(4096); + pReq->commitTime = htonl(3600); + pReq->fsyncPeriod = htonl(3000); + pReq->walLevel = 1; + pReq->precision = 0; + pReq->compression = 2; + pReq->replica = 1; + pReq->quorum = 1; + pReq->update = 0; + pReq->cacheLastRow = 0; + pReq->selfIndex = 0; + for (int r = 0; r < pReq->replica; ++r) { + SReplica* pReplica = &pReq->replicas[r]; + pReplica->id = htonl(1); + pReplica->port = htons(9150); + } - sendMsg(pClient, &rpcMsg); - SRpcMsg* pMsg = pClient->pRsp; - ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, 0); - // taosMsleep(1000000); + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SAlterVnodeMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_VNODE_IN; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + } + } + + { + for (int i = 0; i < 3; ++i) { + SDropVnodeMsg* pReq = (SDropVnodeMsg*)rpcMallocCont(sizeof(SDropVnodeMsg)); + pReq->vgId = htonl(2); + pReq->dnodeId = htonl(1); + strcpy(pReq->db, "1.d1"); + pReq->dbUid = htobe64(9527); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SCreateVnodeMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + } } }