diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 98798a6235..59fde8e9c8 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -754,6 +754,7 @@ typedef struct { int32_t dnodeId; char db[TSDB_FULL_DB_NAME_LEN]; uint64_t dbUid; + int32_t vgVersion; int32_t cacheBlockSize; int32_t totalBlocks; int32_t daysPerFile; diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index d3f1b06a4a..024df14202 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -17,11 +17,23 @@ #include "dndVnodes.h" #include "dndTransport.h" +typedef struct { + int32_t vgId; + int32_t vgVersion; + int8_t dropped; + uint64_t dbUid; + char db[TSDB_FULL_DB_NAME_LEN]; + char path[PATH_MAX + 20]; +} SWrapperCfg; + typedef struct { int32_t vgId; int32_t refCount; + int32_t vgVersion; int8_t dropped; int8_t accessState; + uint64_t dbUid; + char *db; char *path; SVnode *pImpl; taos_queue pWriteQ; @@ -32,13 +44,13 @@ typedef struct { } SVnodeObj; typedef struct { - int32_t vnodeNum; - int32_t opened; - int32_t failed; - int32_t threadIndex; - pthread_t *pThreadId; - SVnodeObj *pVnodes; - SDnode * pDnode; + int32_t vnodeNum; + int32_t opened; + int32_t failed; + int32_t threadIndex; + pthread_t *pThreadId; + SDnode *pDnode; + SWrapperCfg *pCfgs; } SVnodeThread; static int32_t dndInitVnodeReadWorker(SDnode *pDnode); @@ -73,16 +85,14 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEp void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg); -static SVnodeObj * dndAcquireVnode(SDnode *pDnode, int32_t vgId); +static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId); static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl); -static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode); +static int32_t dndCreateVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl); +static void dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode); static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes); -static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_t *numOfVnodes); +static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); static int32_t dndWriteVnodesToFile(SDnode *pDnode); -static int32_t dndCreateVnode(SDnode *pDnode, int32_t vgId, SVnodeCfg *pCfg); -static int32_t dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode); static int32_t dndOpenVnodes(SDnode *pDnode); static void dndCloseVnodes(SDnode *pDnode); @@ -126,22 +136,25 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) { dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); } -static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl) { +static int32_t dndCreateVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SVnodeObj * pVnode = calloc(1, sizeof(SVnodeObj)); + SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); if (pVnode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pVnode->vgId = vgId; - pVnode->refCount = 0; + pVnode->vgId = pCfg->vgId; + pVnode->refCount = 1; pVnode->dropped = 0; pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->pImpl = pImpl; + pVnode->vgVersion = pCfg->vgVersion; + pVnode->dbUid = pCfg->dbUid; + pVnode->db = tstrdup(pCfg->db); + pVnode->path = tstrdup(pCfg->path); - pVnode->path = tstrdup(path); - if (pVnode->path == NULL) { + if (pVnode->path == NULL || pVnode->db == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -167,7 +180,7 @@ static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, S } taosWLockLatch(&pMgmt->latch); - int32_t code = taosHashPut(pMgmt->hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); + int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); taosWUnLockLatch(&pMgmt->latch); if (code != 0) { @@ -176,7 +189,7 @@ static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, S return code; } -static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode) { +static void dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; taosWLockLatch(&pMgmt->latch); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); @@ -195,6 +208,9 @@ static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode) { dndFreeVnodeWriteQueue(pDnode, pVnode); dndFreeVnodeApplyQueue(pDnode, pVnode); dndFreeVnodeSyncQueue(pDnode, pVnode); + free(pVnode->path); + free(pVnode->db); + free(pVnode); } static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) { @@ -208,16 +224,16 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) { void *pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; - SVnodeObj * pVnode = *ppVnode; - if (pVnode) { + SVnodeObj *pVnode = *ppVnode; + if (pVnode && num < size) { + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); + pVnodes[num] = (*ppVnode); num++; - if (num < size) { - int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); - dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); - pVnodes[num] = (*ppVnode); - } + pIter = taosHashIterate(pMgmt->hash, pIter); + } else { + taosHashCancelIterate(pMgmt->hash, pIter); } - pIter = taosHashIterate(pMgmt->hash, pIter); } taosRUnLockLatch(&pMgmt->latch); @@ -226,15 +242,15 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) { return pVnodes; } -static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_t *numOfVnodes) { - int32_t code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR; - int32_t len = 0; - int32_t maxLen = 30000; - char * content = calloc(1, maxLen + 1); - cJSON * root = NULL; - FILE * fp = NULL; - char file[PATH_MAX + 20] = {0}; - SVnodeObj *pVnodes = NULL; +static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) { + int32_t code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR; + int32_t len = 0; + int32_t maxLen = 30000; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + FILE *fp = NULL; + char file[PATH_MAX + 20] = {0}; + SWrapperCfg *pCfgs = NULL; snprintf(file, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes); @@ -270,31 +286,55 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_ goto PRASE_VNODE_OVER; } - pVnodes = calloc(vnodesNum, sizeof(SVnodeObj)); - if (pVnodes == NULL) { + pCfgs = calloc(vnodesNum, sizeof(SWrapperCfg)); + if (pCfgs == NULL) { dError("failed to read %s since out of memory", file); goto PRASE_VNODE_OVER; } for (int32_t i = 0; i < vnodesNum; ++i) { - cJSON * vnode = cJSON_GetArrayItem(vnodes, i); - SVnodeObj *pVnode = &pVnodes[i]; + cJSON *vnode = cJSON_GetArrayItem(vnodes, i); + SWrapperCfg *pCfg = &pCfgs[i]; cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId"); if (!vgId || vgId->type != cJSON_String) { dError("failed to read %s since vgId not found", file); goto PRASE_VNODE_OVER; } - pVnode->vgId = atoi(vgId->valuestring); + pCfg->vgId = atoi(vgId->valuestring); + snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCfg->vgId); cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped"); if (!dropped || dropped->type != cJSON_String) { dError("failed to read %s since dropped not found", file); goto PRASE_VNODE_OVER; } - pVnode->dropped = atoi(vnode->valuestring); + pCfg->dropped = atoi(dropped->valuestring); + + cJSON *vgVersion = cJSON_GetObjectItem(vnode, "vgVersion"); + if (!vgVersion || vgVersion->type != cJSON_String) { + dError("failed to read %s since vgVersion not found", file); + goto PRASE_VNODE_OVER; + } + pCfg->vgVersion = atoi(vgVersion->valuestring); + + cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid"); + if (!dbUid || dbUid->type != cJSON_String) { + dError("failed to read %s since dbUid not found", file); + goto PRASE_VNODE_OVER; + } + pCfg->dbUid = atoll(dbUid->valuestring); + + cJSON *db = cJSON_GetObjectItem(vnode, "db"); + if (!db || db->type != cJSON_String) { + dError("failed to read %s since db not found", file); + goto PRASE_VNODE_OVER; + } + tstrncpy(pCfg->db, db->valuestring, TSDB_FULL_DB_NAME_LEN); } + *ppCfgs = pCfgs; + *numOfVnodes = vnodesNum; code = 0; dInfo("succcessed to read file %s", file); @@ -313,7 +353,7 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) { snprintf(realfile, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes); FILE *fp = fopen(file, "w"); - if (fp != NULL) { + if (fp == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to write %s since %s", file, terrstr()); return -1; @@ -321,7 +361,7 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) { int32_t len = 0; int32_t maxLen = 30000; - char * content = calloc(1, maxLen + 1); + char *content = calloc(1, maxLen + 1); int32_t numOfVnodes = 0; SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes); @@ -330,7 +370,10 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) { for (int32_t i = 0; i < numOfVnodes; ++i) { SVnodeObj *pVnode = pVnodes[i]; len += snprintf(content + len, maxLen - len, " \"vgId\": \"%d\",\n", pVnode->vgId); - len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\"\n", pVnode->dropped); + len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pVnode->dropped); + len += snprintf(content + len, maxLen - len, " \"vgVersion\": \"%d\",\n", pVnode->vgVersion); + len += snprintf(content + len, maxLen - len, " \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid); + len += snprintf(content + len, maxLen - len, " \"db\": \"%s\"\n", pVnode->db); if (i < numOfVnodes - 1) { len += snprintf(content + len, maxLen - len, " },{\n"); } else { @@ -358,74 +401,29 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) { return taosRenameFile(file, realfile); } -static int32_t dndCreateVnode(SDnode *pDnode, int32_t vgId, SVnodeCfg *pCfg) { - char path[PATH_MAX + 20] = {0}; - snprintf(path, sizeof(path), "%s/vnode%d", pDnode->dir.vnodes, vgId); - // SVnode *pImpl = vnodeCreate(vgId, path, pCfg); - - SVnode *pImpl = vnodeOpen(path, NULL); - if (pImpl == NULL) { - return -1; - } - - int32_t code = dndCreateVnodeWrapper(pDnode, vgId, path, pImpl); - if (code != 0) { - vnodeClose(pImpl); - vnodeDestroy(path); - terrno = code; - return code; - } - - code = dndWriteVnodesToFile(pDnode); - if (code != 0) { - vnodeClose(pImpl); - vnodeDestroy(path); - terrno = code; - return code; - } - - return 0; -} - -static int32_t dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode) { - pVnode->dropped = 1; - if (dndWriteVnodesToFile(pDnode) != 0) { - pVnode->dropped = 0; - return -1; - } - - dndDropVnodeWrapper(pDnode, pVnode); - vnodeClose(pVnode->pImpl); - vnodeDestroy(pVnode->path); - dndWriteVnodesToFile(pDnode); - return 0; -} - static void *dnodeOpenVnodeFunc(void *param) { SVnodeThread *pThread = param; - SDnode * pDnode = pThread->pDnode; - SVnodesMgmt * pMgmt = &pDnode->vmgmt; + SDnode *pDnode = pThread->pDnode; + SVnodesMgmt *pMgmt = &pDnode->vmgmt; dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); setThreadName("open-vnodes"); for (int32_t v = 0; v < pThread->vnodeNum; ++v) { - SVnodeObj *pVnode = &pThread->pVnodes[v]; + SWrapperCfg *pCfg = &pThread->pCfgs[v]; char stepDesc[TSDB_STEP_DESC_LEN] = {0}; - snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pVnode->vgId, + snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId, pMgmt->openVnodes, pMgmt->totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - char path[PATH_MAX + 20] = {0}; - snprintf(path, sizeof(path), "%s/vnode%d", pDnode->dir.vnodes, pVnode->vgId); - SVnode *pImpl = vnodeOpen(path, NULL); + SVnode *pImpl = vnodeOpen(pCfg->path, NULL); if (pImpl == NULL) { - dError("vgId:%d, failed to open vnode by thread:%d", pVnode->vgId, pThread->threadIndex); + dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; } else { - dndCreateVnodeWrapper(pDnode, pVnode->vgId, path, pImpl); - dDebug("vgId:%d, is opened by thread:%d", pVnode->vgId, pThread->threadIndex); + dndCreateVnode(pDnode, pCfg, pImpl); + dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->opened++; } @@ -448,9 +446,9 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { return -1; } - SVnodeObj *pVnodes = NULL; - int32_t numOfVnodes = 0; - if (dndGetVnodesFromFile(pDnode, &pVnodes, &numOfVnodes) != 0) { + SWrapperCfg *pCfgs = NULL; + int32_t numOfVnodes = 0; + if (dndGetVnodesFromFile(pDnode, &pCfgs, &numOfVnodes) != 0) { dInfo("failed to get vnode list from disk since %s", terrstr()); return -1; } @@ -463,13 +461,13 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread)); for (int32_t t = 0; t < threadNum; ++t) { threads[t].threadIndex = t; - threads[t].pVnodes = calloc(vnodesPerThread, sizeof(SVnodeObj)); + threads[t].pCfgs = calloc(vnodesPerThread, sizeof(SWrapperCfg)); } for (int32_t v = 0; v < numOfVnodes; ++v) { int32_t t = v % threadNum; SVnodeThread *pThread = &threads[t]; - pThread->pVnodes[pThread->vnodeNum++] = pVnodes[v]; + pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v]; } dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes); @@ -488,9 +486,10 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { SVnodeThread *pThread = &threads[t]; taosDestoryThread(pThread->pThreadId); pThread->pThreadId = NULL; - free(pThread->pVnodes); + free(pThread->pCfgs); } free(threads); + free(pCfgs); if (pMgmt->openVnodes != pMgmt->totalVnodes) { dError("there are total vnodes:%d, opened:%d", pMgmt->totalVnodes, pMgmt->openVnodes); @@ -508,7 +507,7 @@ static void dndCloseVnodes(SDnode *pDnode) { SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes); for (int32_t i = 0; i < numOfVnodes; ++i) { - dndDropVnodeWrapper(pDnode, pVnodes[i]); + dndDropVnode(pDnode, pVnodes[i]); } if (pVnodes != NULL) { @@ -523,11 +522,12 @@ static void dndCloseVnodes(SDnode *pDnode) { dInfo("total vnodes:%d are all closed", numOfVnodes); } -static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg *pCfg) { +static SCreateVnodeMsg *dndParseCreateVnodeReq(SRpcMsg *rpcMsg) { SCreateVnodeMsg *pCreate = rpcMsg->pCont; pCreate->vgId = htonl(pCreate->vgId); pCreate->dnodeId = htonl(pCreate->dnodeId); pCreate->dbUid = htobe64(pCreate->dbUid); + pCreate->vgVersion = htonl(pCreate->vgVersion); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); pCreate->totalBlocks = htonl(pCreate->totalBlocks); pCreate->daysPerFile = htonl(pCreate->daysPerFile); @@ -544,9 +544,10 @@ static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg pReplica->port = htons(pReplica->port); } - *vgId = pCreate->vgId; + return pCreate; +} -#if 0 +static void dndGenerateVnodeCfg(SCreateVnodeMsg *pCreate, SVnodeCfg *pCfg) { pCfg->wsize = pCreate->cacheBlockSize; pCfg->ssize = pCreate->cacheBlockSize; pCfg->wsize = pCreate->cacheBlockSize; @@ -567,8 +568,15 @@ static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg pCfg->walCfg.rollPeriod = 128; pCfg->walCfg.segSize = 128; pCfg->walCfg.vgId = pCreate->vgId; -#endif - return 0; +} + +static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeMsg *pCreate, SWrapperCfg *pCfg) { + memcpy(pCfg->db, pCreate->db, TSDB_FULL_DB_NAME_LEN); + pCfg->dbUid = pCreate->dbUid; + pCfg->dropped = 0; + snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCreate->vgId); + pCfg->vgId = pCreate->vgId; + pCfg->vgVersion = pCreate->vgVersion; } static SDropVnodeMsg *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) { @@ -584,42 +592,61 @@ static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) { } static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { + SCreateVnodeMsg *pCreate = dndParseCreateVnodeReq(rpcMsg); + dDebug("vgId:%d, create vnode req is received", pCreate->vgId); + SVnodeCfg vnodeCfg = {0}; - int32_t vgId = 0; + dndGenerateVnodeCfg(pCreate, &vnodeCfg); - dndParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg); - dDebug("vgId:%d, create vnode req is received", vgId); + SWrapperCfg wrapperCfg = {0}; + dndGenerateWrapperCfg(pDnode, pCreate, &wrapperCfg); - SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); + SVnodeObj *pVnode = dndAcquireVnode(pDnode, pCreate->vgId); if (pVnode != NULL) { - dDebug("vgId:%d, already exist, return success", vgId); + dDebug("vgId:%d, already exist, return success", pCreate->vgId); dndReleaseVnode(pDnode, pVnode); return 0; } - if (dndCreateVnode(pDnode, vgId, &vnodeCfg) != 0) { - dError("vgId:%d, failed to create vnode since %s", vgId, terrstr()); - return terrno; + SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/); + if (pImpl == NULL) { + return -1; + } + + int32_t code = dndCreateVnode(pDnode, &wrapperCfg, pImpl); + if (code != 0) { + vnodeClose(pImpl); + vnodeDestroy(wrapperCfg.path); + terrno = code; + return code; + } + + code = dndWriteVnodesToFile(pDnode); + if (code != 0) { + vnodeClose(pImpl); + vnodeDestroy(wrapperCfg.path); + terrno = code; + return code; } return 0; } static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { + SAlterVnodeMsg *pAlter = (SAlterVnodeMsg *)dndParseCreateVnodeReq(rpcMsg); + dDebug("vgId:%d, alter vnode req is received", pAlter->vgId); + SVnodeCfg vnodeCfg = {0}; - int32_t vgId = 0; + dndGenerateVnodeCfg(pAlter, &vnodeCfg); - dndParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg); - dDebug("vgId:%d, alter vnode req is received", vgId); - - SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); + SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId); if (pVnode == NULL) { - dDebug("vgId:%d, failed to alter vnode since %s", vgId, terrstr()); + dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr()); return terrno; } if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) { - dError("vgId:%d, failed to alter vnode since %s", vgId, terrstr()); + dError("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr()); dndReleaseVnode(pDnode, pVnode); return terrno; } @@ -640,12 +667,17 @@ static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return terrno; } - if (dndDropVnode(pDnode, pVnode) != 0) { - dError("vgId:%d, failed to drop vnode since %s", vgId, terrstr()); - dndReleaseVnode(pDnode, pVnode); + pVnode->dropped = 1; + if (dndWriteVnodesToFile(pDnode) != 0) { + pVnode->dropped = 0; return terrno; } + dndDropVnode(pDnode, pVnode); + vnodeClose(pVnode->pImpl); + vnodeDestroy(pVnode->path); + dndWriteVnodesToFile(pDnode); + return 0; } @@ -738,12 +770,10 @@ static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { break; } - if (code != 0) { - SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; - rpcSendResponse(&rsp); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - } + SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; + rpcSendResponse(&rsp); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); } static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) { @@ -756,7 +786,7 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) { static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { SVnodeMsg *pMsg = vnodeInitMsg(numOfMsgs); - SRpcMsg * pRpcMsg = NULL; + SRpcMsg *pRpcMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pRpcMsg); @@ -1029,7 +1059,7 @@ static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) { } static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) { - SVnodesMgmt * pMgmt = &pDnode->vmgmt; + SVnodesMgmt *pMgmt = &pDnode->vmgmt; SMWorkerPool *pPool = &pMgmt->writePool; pPool->name = "vnode-write"; pPool->max = pDnode->opt.numOfCores; @@ -1137,12 +1167,12 @@ void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) { pLoads->num = taosHashGetSize(pMgmt->hash); int32_t v = 0; - void * pIter = taosHashIterate(pMgmt->hash, NULL); + void *pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; if (ppVnode == NULL || *ppVnode == NULL) continue; - SVnodeObj * pVnode = *ppVnode; + SVnodeObj *pVnode = *ppVnode; SVnodeLoad *pLoad = &pLoads->data[v++]; vnodeGetLoad(pVnode->pImpl, pLoad); diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index 0dbee0d337..a4996ecb3b 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -176,6 +176,12 @@ SDnode *dndInit(SDnodeOpt *pOption) { return NULL; } + if (vnodeInit(1) != 0) { + dError("failed to init vnode env"); + dndCleanup(pDnode); + return NULL; + } + if (dndInitDnode(pDnode) != 0) { dError("failed to init dnode"); dndCleanup(pDnode); @@ -222,8 +228,10 @@ void dndCleanup(SDnode *pDnode) { dndCleanupMnode(pDnode); dndCleanupVnodes(pDnode); dndCleanupDnode(pDnode); + vnodeClear(); walCleanUp(); rpcCleanup(); + dndCleanupEnv(pDnode); free(pDnode); dInfo("TDengine is cleaned up successfully"); diff --git a/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp b/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp index 3f16cd87d8..1465e069aa 100644 --- a/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp +++ b/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp @@ -176,7 +176,6 @@ SServer* DndTestVgroup::pServer; SClient* DndTestVgroup::pClient; int32_t DndTestVgroup::connId; - TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { { SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(sizeof(SCreateVnodeMsg)); @@ -184,6 +183,7 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { pReq->dnodeId = htonl(1); strcpy(pReq->db, "1.d1"); pReq->dbUid = htobe64(9527); + pReq->vgVersion = htonl(1); pReq->cacheBlockSize = htonl(16); pReq->totalBlocks = htonl(10); pReq->daysPerFile = htonl(10); @@ -217,8 +217,6 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { SRpcMsg* pMsg = pClient->pRsp; ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); - taosMsleep(1000000); + // taosMsleep(1000000); } - } - diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index e99fea200b..2b38539963 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -168,6 +168,7 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb pCreate->vgId = htonl(pVgroup->vgId); memcpy(pCreate->db, pDb->name, TSDB_FULL_DB_NAME_LEN); pCreate->dbUid = htobe64(pDb->uid); + pCreate->vgVersion = htonl(pVgroup->version); pCreate->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize); pCreate->totalBlocks = htonl(pDb->cfg.totalBlocks); pCreate->daysPerFile = htonl(pDb->cfg.daysPerFile);