TD-10431 process create vnode msg

This commit is contained in:
Shengliang Guan 2021-12-19 14:43:58 +08:00
parent 197a9f4ea9
commit 6c1e5acd82
5 changed files with 182 additions and 144 deletions

View File

@ -754,6 +754,7 @@ typedef struct {
int32_t dnodeId; int32_t dnodeId;
char db[TSDB_FULL_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
uint64_t dbUid; uint64_t dbUid;
int32_t vgVersion;
int32_t cacheBlockSize; int32_t cacheBlockSize;
int32_t totalBlocks; int32_t totalBlocks;
int32_t daysPerFile; int32_t daysPerFile;

View File

@ -17,11 +17,23 @@
#include "dndVnodes.h" #include "dndVnodes.h"
#include "dndTransport.h" #include "dndTransport.h"
typedef struct {
int32_t vgId;
int32_t vgVersion;
int8_t dropped;
uint64_t dbUid;
char db[TSDB_FULL_DB_NAME_LEN];
char path[PATH_MAX + 20];
} SWrapperCfg;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int32_t refCount; int32_t refCount;
int32_t vgVersion;
int8_t dropped; int8_t dropped;
int8_t accessState; int8_t accessState;
uint64_t dbUid;
char *db;
char *path; char *path;
SVnode *pImpl; SVnode *pImpl;
taos_queue pWriteQ; taos_queue pWriteQ;
@ -32,13 +44,13 @@ typedef struct {
} SVnodeObj; } SVnodeObj;
typedef struct { typedef struct {
int32_t vnodeNum; int32_t vnodeNum;
int32_t opened; int32_t opened;
int32_t failed; int32_t failed;
int32_t threadIndex; int32_t threadIndex;
pthread_t *pThreadId; pthread_t *pThreadId;
SVnodeObj *pVnodes; SDnode *pDnode;
SDnode * pDnode; SWrapperCfg *pCfgs;
} SVnodeThread; } SVnodeThread;
static int32_t dndInitVnodeReadWorker(SDnode *pDnode); static int32_t dndInitVnodeReadWorker(SDnode *pDnode);
@ -73,16 +85,14 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEp
void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg); static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg);
static SVnodeObj * dndAcquireVnode(SDnode *pDnode, int32_t vgId); static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId);
static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode); static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl); static int32_t dndCreateVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl);
static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode); static void dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode);
static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes); static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes);
static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_t *numOfVnodes); static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
static int32_t dndWriteVnodesToFile(SDnode *pDnode); static int32_t dndWriteVnodesToFile(SDnode *pDnode);
static int32_t dndCreateVnode(SDnode *pDnode, int32_t vgId, SVnodeCfg *pCfg);
static int32_t dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndOpenVnodes(SDnode *pDnode); static int32_t dndOpenVnodes(SDnode *pDnode);
static void dndCloseVnodes(SDnode *pDnode); static void dndCloseVnodes(SDnode *pDnode);
@ -126,22 +136,25 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
} }
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl) { static int32_t dndCreateVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodeObj * pVnode = calloc(1, sizeof(SVnodeObj)); SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj));
if (pVnode == NULL) { if (pVnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pVnode->vgId = vgId; pVnode->vgId = pCfg->vgId;
pVnode->refCount = 0; pVnode->refCount = 1;
pVnode->dropped = 0; pVnode->dropped = 0;
pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->accessState = TSDB_VN_ALL_ACCCESS;
pVnode->pImpl = pImpl; pVnode->pImpl = pImpl;
pVnode->vgVersion = pCfg->vgVersion;
pVnode->dbUid = pCfg->dbUid;
pVnode->db = tstrdup(pCfg->db);
pVnode->path = tstrdup(pCfg->path);
pVnode->path = tstrdup(path); if (pVnode->path == NULL || pVnode->db == NULL) {
if (pVnode->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
@ -167,7 +180,7 @@ static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, S
} }
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
int32_t code = taosHashPut(pMgmt->hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
taosWUnLockLatch(&pMgmt->latch); taosWUnLockLatch(&pMgmt->latch);
if (code != 0) { if (code != 0) {
@ -176,7 +189,7 @@ static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, S
return code; return code;
} }
static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode) { static void dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
@ -195,6 +208,9 @@ static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode) {
dndFreeVnodeWriteQueue(pDnode, pVnode); dndFreeVnodeWriteQueue(pDnode, pVnode);
dndFreeVnodeApplyQueue(pDnode, pVnode); dndFreeVnodeApplyQueue(pDnode, pVnode);
dndFreeVnodeSyncQueue(pDnode, pVnode); dndFreeVnodeSyncQueue(pDnode, pVnode);
free(pVnode->path);
free(pVnode->db);
free(pVnode);
} }
static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) { static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
@ -208,16 +224,16 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
void *pIter = taosHashIterate(pMgmt->hash, NULL); void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) { while (pIter) {
SVnodeObj **ppVnode = pIter; SVnodeObj **ppVnode = pIter;
SVnodeObj * pVnode = *ppVnode; SVnodeObj *pVnode = *ppVnode;
if (pVnode) { if (pVnode && num < size) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
pVnodes[num] = (*ppVnode);
num++; num++;
if (num < size) { pIter = taosHashIterate(pMgmt->hash, pIter);
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); } else {
dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); taosHashCancelIterate(pMgmt->hash, pIter);
pVnodes[num] = (*ppVnode);
}
} }
pIter = taosHashIterate(pMgmt->hash, pIter);
} }
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
@ -226,15 +242,15 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
return pVnodes; return pVnodes;
} }
static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_t *numOfVnodes) { static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
int32_t code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR; int32_t code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR;
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 30000; int32_t maxLen = 30000;
char * content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
cJSON * root = NULL; cJSON *root = NULL;
FILE * fp = NULL; FILE *fp = NULL;
char file[PATH_MAX + 20] = {0}; char file[PATH_MAX + 20] = {0};
SVnodeObj *pVnodes = NULL; SWrapperCfg *pCfgs = NULL;
snprintf(file, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes); snprintf(file, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);
@ -270,31 +286,55 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_
goto PRASE_VNODE_OVER; goto PRASE_VNODE_OVER;
} }
pVnodes = calloc(vnodesNum, sizeof(SVnodeObj)); pCfgs = calloc(vnodesNum, sizeof(SWrapperCfg));
if (pVnodes == NULL) { if (pCfgs == NULL) {
dError("failed to read %s since out of memory", file); dError("failed to read %s since out of memory", file);
goto PRASE_VNODE_OVER; goto PRASE_VNODE_OVER;
} }
for (int32_t i = 0; i < vnodesNum; ++i) { for (int32_t i = 0; i < vnodesNum; ++i) {
cJSON * vnode = cJSON_GetArrayItem(vnodes, i); cJSON *vnode = cJSON_GetArrayItem(vnodes, i);
SVnodeObj *pVnode = &pVnodes[i]; SWrapperCfg *pCfg = &pCfgs[i];
cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId"); cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId");
if (!vgId || vgId->type != cJSON_String) { if (!vgId || vgId->type != cJSON_String) {
dError("failed to read %s since vgId not found", file); dError("failed to read %s since vgId not found", file);
goto PRASE_VNODE_OVER; goto PRASE_VNODE_OVER;
} }
pVnode->vgId = atoi(vgId->valuestring); pCfg->vgId = atoi(vgId->valuestring);
snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCfg->vgId);
cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped"); cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped");
if (!dropped || dropped->type != cJSON_String) { if (!dropped || dropped->type != cJSON_String) {
dError("failed to read %s since dropped not found", file); dError("failed to read %s since dropped not found", file);
goto PRASE_VNODE_OVER; goto PRASE_VNODE_OVER;
} }
pVnode->dropped = atoi(vnode->valuestring); pCfg->dropped = atoi(dropped->valuestring);
cJSON *vgVersion = cJSON_GetObjectItem(vnode, "vgVersion");
if (!vgVersion || vgVersion->type != cJSON_String) {
dError("failed to read %s since vgVersion not found", file);
goto PRASE_VNODE_OVER;
}
pCfg->vgVersion = atoi(vgVersion->valuestring);
cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid");
if (!dbUid || dbUid->type != cJSON_String) {
dError("failed to read %s since dbUid not found", file);
goto PRASE_VNODE_OVER;
}
pCfg->dbUid = atoll(dbUid->valuestring);
cJSON *db = cJSON_GetObjectItem(vnode, "db");
if (!db || db->type != cJSON_String) {
dError("failed to read %s since db not found", file);
goto PRASE_VNODE_OVER;
}
tstrncpy(pCfg->db, db->valuestring, TSDB_FULL_DB_NAME_LEN);
} }
*ppCfgs = pCfgs;
*numOfVnodes = vnodesNum;
code = 0; code = 0;
dInfo("succcessed to read file %s", file); dInfo("succcessed to read file %s", file);
@ -313,7 +353,7 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
snprintf(realfile, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes); snprintf(realfile, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);
FILE *fp = fopen(file, "w"); FILE *fp = fopen(file, "w");
if (fp != NULL) { if (fp == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to write %s since %s", file, terrstr()); dError("failed to write %s since %s", file, terrstr());
return -1; return -1;
@ -321,7 +361,7 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 30000; int32_t maxLen = 30000;
char * content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
int32_t numOfVnodes = 0; int32_t numOfVnodes = 0;
SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes); SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);
@ -330,7 +370,10 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
for (int32_t i = 0; i < numOfVnodes; ++i) { for (int32_t i = 0; i < numOfVnodes; ++i) {
SVnodeObj *pVnode = pVnodes[i]; SVnodeObj *pVnode = pVnodes[i];
len += snprintf(content + len, maxLen - len, " \"vgId\": \"%d\",\n", pVnode->vgId); len += snprintf(content + len, maxLen - len, " \"vgId\": \"%d\",\n", pVnode->vgId);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\"\n", pVnode->dropped); len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pVnode->dropped);
len += snprintf(content + len, maxLen - len, " \"vgVersion\": \"%d\",\n", pVnode->vgVersion);
len += snprintf(content + len, maxLen - len, " \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid);
len += snprintf(content + len, maxLen - len, " \"db\": \"%s\"\n", pVnode->db);
if (i < numOfVnodes - 1) { if (i < numOfVnodes - 1) {
len += snprintf(content + len, maxLen - len, " },{\n"); len += snprintf(content + len, maxLen - len, " },{\n");
} else { } else {
@ -358,74 +401,29 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
return taosRenameFile(file, realfile); return taosRenameFile(file, realfile);
} }
static int32_t dndCreateVnode(SDnode *pDnode, int32_t vgId, SVnodeCfg *pCfg) {
char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path), "%s/vnode%d", pDnode->dir.vnodes, vgId);
// SVnode *pImpl = vnodeCreate(vgId, path, pCfg);
SVnode *pImpl = vnodeOpen(path, NULL);
if (pImpl == NULL) {
return -1;
}
int32_t code = dndCreateVnodeWrapper(pDnode, vgId, path, pImpl);
if (code != 0) {
vnodeClose(pImpl);
vnodeDestroy(path);
terrno = code;
return code;
}
code = dndWriteVnodesToFile(pDnode);
if (code != 0) {
vnodeClose(pImpl);
vnodeDestroy(path);
terrno = code;
return code;
}
return 0;
}
static int32_t dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode) {
pVnode->dropped = 1;
if (dndWriteVnodesToFile(pDnode) != 0) {
pVnode->dropped = 0;
return -1;
}
dndDropVnodeWrapper(pDnode, pVnode);
vnodeClose(pVnode->pImpl);
vnodeDestroy(pVnode->path);
dndWriteVnodesToFile(pDnode);
return 0;
}
static void *dnodeOpenVnodeFunc(void *param) { static void *dnodeOpenVnodeFunc(void *param) {
SVnodeThread *pThread = param; SVnodeThread *pThread = param;
SDnode * pDnode = pThread->pDnode; SDnode *pDnode = pThread->pDnode;
SVnodesMgmt * pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
setThreadName("open-vnodes"); setThreadName("open-vnodes");
for (int32_t v = 0; v < pThread->vnodeNum; ++v) { for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
SVnodeObj *pVnode = &pThread->pVnodes[v]; SWrapperCfg *pCfg = &pThread->pCfgs[v];
char stepDesc[TSDB_STEP_DESC_LEN] = {0}; char stepDesc[TSDB_STEP_DESC_LEN] = {0};
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pVnode->vgId, snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
pMgmt->openVnodes, pMgmt->totalVnodes); pMgmt->openVnodes, pMgmt->totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc); dndReportStartup(pDnode, "open-vnodes", stepDesc);
char path[PATH_MAX + 20] = {0}; SVnode *pImpl = vnodeOpen(pCfg->path, NULL);
snprintf(path, sizeof(path), "%s/vnode%d", pDnode->dir.vnodes, pVnode->vgId);
SVnode *pImpl = vnodeOpen(path, NULL);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pVnode->vgId, pThread->threadIndex); dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->failed++; pThread->failed++;
} else { } else {
dndCreateVnodeWrapper(pDnode, pVnode->vgId, path, pImpl); dndCreateVnode(pDnode, pCfg, pImpl);
dDebug("vgId:%d, is opened by thread:%d", pVnode->vgId, pThread->threadIndex); dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->opened++; pThread->opened++;
} }
@ -448,9 +446,9 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
return -1; return -1;
} }
SVnodeObj *pVnodes = NULL; SWrapperCfg *pCfgs = NULL;
int32_t numOfVnodes = 0; int32_t numOfVnodes = 0;
if (dndGetVnodesFromFile(pDnode, &pVnodes, &numOfVnodes) != 0) { if (dndGetVnodesFromFile(pDnode, &pCfgs, &numOfVnodes) != 0) {
dInfo("failed to get vnode list from disk since %s", terrstr()); dInfo("failed to get vnode list from disk since %s", terrstr());
return -1; return -1;
} }
@ -463,13 +461,13 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread)); SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread));
for (int32_t t = 0; t < threadNum; ++t) { for (int32_t t = 0; t < threadNum; ++t) {
threads[t].threadIndex = t; threads[t].threadIndex = t;
threads[t].pVnodes = calloc(vnodesPerThread, sizeof(SVnodeObj)); threads[t].pCfgs = calloc(vnodesPerThread, sizeof(SWrapperCfg));
} }
for (int32_t v = 0; v < numOfVnodes; ++v) { for (int32_t v = 0; v < numOfVnodes; ++v) {
int32_t t = v % threadNum; int32_t t = v % threadNum;
SVnodeThread *pThread = &threads[t]; SVnodeThread *pThread = &threads[t];
pThread->pVnodes[pThread->vnodeNum++] = pVnodes[v]; pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
} }
dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes); dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);
@ -488,9 +486,10 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
SVnodeThread *pThread = &threads[t]; SVnodeThread *pThread = &threads[t];
taosDestoryThread(pThread->pThreadId); taosDestoryThread(pThread->pThreadId);
pThread->pThreadId = NULL; pThread->pThreadId = NULL;
free(pThread->pVnodes); free(pThread->pCfgs);
} }
free(threads); free(threads);
free(pCfgs);
if (pMgmt->openVnodes != pMgmt->totalVnodes) { if (pMgmt->openVnodes != pMgmt->totalVnodes) {
dError("there are total vnodes:%d, opened:%d", pMgmt->totalVnodes, pMgmt->openVnodes); dError("there are total vnodes:%d, opened:%d", pMgmt->totalVnodes, pMgmt->openVnodes);
@ -508,7 +507,7 @@ static void dndCloseVnodes(SDnode *pDnode) {
SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes); SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);
for (int32_t i = 0; i < numOfVnodes; ++i) { for (int32_t i = 0; i < numOfVnodes; ++i) {
dndDropVnodeWrapper(pDnode, pVnodes[i]); dndDropVnode(pDnode, pVnodes[i]);
} }
if (pVnodes != NULL) { if (pVnodes != NULL) {
@ -523,11 +522,12 @@ static void dndCloseVnodes(SDnode *pDnode) {
dInfo("total vnodes:%d are all closed", numOfVnodes); dInfo("total vnodes:%d are all closed", numOfVnodes);
} }
static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg *pCfg) { static SCreateVnodeMsg *dndParseCreateVnodeReq(SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = rpcMsg->pCont; SCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->vgId = htonl(pCreate->vgId); pCreate->vgId = htonl(pCreate->vgId);
pCreate->dnodeId = htonl(pCreate->dnodeId); pCreate->dnodeId = htonl(pCreate->dnodeId);
pCreate->dbUid = htobe64(pCreate->dbUid); pCreate->dbUid = htobe64(pCreate->dbUid);
pCreate->vgVersion = htonl(pCreate->vgVersion);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCreate->totalBlocks = htonl(pCreate->totalBlocks); pCreate->totalBlocks = htonl(pCreate->totalBlocks);
pCreate->daysPerFile = htonl(pCreate->daysPerFile); pCreate->daysPerFile = htonl(pCreate->daysPerFile);
@ -544,9 +544,10 @@ static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg
pReplica->port = htons(pReplica->port); pReplica->port = htons(pReplica->port);
} }
*vgId = pCreate->vgId; return pCreate;
}
#if 0 static void dndGenerateVnodeCfg(SCreateVnodeMsg *pCreate, SVnodeCfg *pCfg) {
pCfg->wsize = pCreate->cacheBlockSize; pCfg->wsize = pCreate->cacheBlockSize;
pCfg->ssize = pCreate->cacheBlockSize; pCfg->ssize = pCreate->cacheBlockSize;
pCfg->wsize = pCreate->cacheBlockSize; pCfg->wsize = pCreate->cacheBlockSize;
@ -567,8 +568,15 @@ static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg
pCfg->walCfg.rollPeriod = 128; pCfg->walCfg.rollPeriod = 128;
pCfg->walCfg.segSize = 128; pCfg->walCfg.segSize = 128;
pCfg->walCfg.vgId = pCreate->vgId; pCfg->walCfg.vgId = pCreate->vgId;
#endif }
return 0;
static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeMsg *pCreate, SWrapperCfg *pCfg) {
memcpy(pCfg->db, pCreate->db, TSDB_FULL_DB_NAME_LEN);
pCfg->dbUid = pCreate->dbUid;
pCfg->dropped = 0;
snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCreate->vgId);
pCfg->vgId = pCreate->vgId;
pCfg->vgVersion = pCreate->vgVersion;
} }
static SDropVnodeMsg *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) { static SDropVnodeMsg *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) {
@ -584,42 +592,61 @@ static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) {
} }
static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = dndParseCreateVnodeReq(rpcMsg);
dDebug("vgId:%d, create vnode req is received", pCreate->vgId);
SVnodeCfg vnodeCfg = {0}; SVnodeCfg vnodeCfg = {0};
int32_t vgId = 0; dndGenerateVnodeCfg(pCreate, &vnodeCfg);
dndParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg); SWrapperCfg wrapperCfg = {0};
dDebug("vgId:%d, create vnode req is received", vgId); dndGenerateWrapperCfg(pDnode, pCreate, &wrapperCfg);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, pCreate->vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
dDebug("vgId:%d, already exist, return success", vgId); dDebug("vgId:%d, already exist, return success", pCreate->vgId);
dndReleaseVnode(pDnode, pVnode); dndReleaseVnode(pDnode, pVnode);
return 0; return 0;
} }
if (dndCreateVnode(pDnode, vgId, &vnodeCfg) != 0) { SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/);
dError("vgId:%d, failed to create vnode since %s", vgId, terrstr()); if (pImpl == NULL) {
return terrno; return -1;
}
int32_t code = dndCreateVnode(pDnode, &wrapperCfg, pImpl);
if (code != 0) {
vnodeClose(pImpl);
vnodeDestroy(wrapperCfg.path);
terrno = code;
return code;
}
code = dndWriteVnodesToFile(pDnode);
if (code != 0) {
vnodeClose(pImpl);
vnodeDestroy(wrapperCfg.path);
terrno = code;
return code;
} }
return 0; return 0;
} }
static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SAlterVnodeMsg *pAlter = (SAlterVnodeMsg *)dndParseCreateVnodeReq(rpcMsg);
dDebug("vgId:%d, alter vnode req is received", pAlter->vgId);
SVnodeCfg vnodeCfg = {0}; SVnodeCfg vnodeCfg = {0};
int32_t vgId = 0; dndGenerateVnodeCfg(pAlter, &vnodeCfg);
dndParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg); SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId);
dDebug("vgId:%d, alter vnode req is received", vgId);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
dDebug("vgId:%d, failed to alter vnode since %s", vgId, terrstr()); dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
return terrno; return terrno;
} }
if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) { if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) {
dError("vgId:%d, failed to alter vnode since %s", vgId, terrstr()); dError("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
dndReleaseVnode(pDnode, pVnode); dndReleaseVnode(pDnode, pVnode);
return terrno; return terrno;
} }
@ -640,12 +667,17 @@ static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
return terrno; return terrno;
} }
if (dndDropVnode(pDnode, pVnode) != 0) { pVnode->dropped = 1;
dError("vgId:%d, failed to drop vnode since %s", vgId, terrstr()); if (dndWriteVnodesToFile(pDnode) != 0) {
dndReleaseVnode(pDnode, pVnode); pVnode->dropped = 0;
return terrno; return terrno;
} }
dndDropVnode(pDnode, pVnode);
vnodeClose(pVnode->pImpl);
vnodeDestroy(pVnode->path);
dndWriteVnodesToFile(pDnode);
return 0; return 0;
} }
@ -738,12 +770,10 @@ static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
break; break;
} }
if (code != 0) { SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; rpcSendResponse(&rsp);
rpcSendResponse(&rsp); rpcFreeCont(pMsg->pCont);
rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg);
taosFreeQitem(pMsg);
}
} }
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) { static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) {
@ -756,7 +786,7 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) {
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
SVnodeMsg *pMsg = vnodeInitMsg(numOfMsgs); SVnodeMsg *pMsg = vnodeInitMsg(numOfMsgs);
SRpcMsg * pRpcMsg = NULL; SRpcMsg *pRpcMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg); taosGetQitem(qall, (void **)&pRpcMsg);
@ -1029,7 +1059,7 @@ static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) {
} }
static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) { static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) {
SVnodesMgmt * pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SMWorkerPool *pPool = &pMgmt->writePool; SMWorkerPool *pPool = &pMgmt->writePool;
pPool->name = "vnode-write"; pPool->name = "vnode-write";
pPool->max = pDnode->opt.numOfCores; pPool->max = pDnode->opt.numOfCores;
@ -1137,12 +1167,12 @@ void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) {
pLoads->num = taosHashGetSize(pMgmt->hash); pLoads->num = taosHashGetSize(pMgmt->hash);
int32_t v = 0; int32_t v = 0;
void * pIter = taosHashIterate(pMgmt->hash, NULL); void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) { while (pIter) {
SVnodeObj **ppVnode = pIter; SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL || *ppVnode == NULL) continue; if (ppVnode == NULL || *ppVnode == NULL) continue;
SVnodeObj * pVnode = *ppVnode; SVnodeObj *pVnode = *ppVnode;
SVnodeLoad *pLoad = &pLoads->data[v++]; SVnodeLoad *pLoad = &pLoads->data[v++];
vnodeGetLoad(pVnode->pImpl, pLoad); vnodeGetLoad(pVnode->pImpl, pLoad);

View File

@ -176,6 +176,12 @@ SDnode *dndInit(SDnodeOpt *pOption) {
return NULL; return NULL;
} }
if (vnodeInit(1) != 0) {
dError("failed to init vnode env");
dndCleanup(pDnode);
return NULL;
}
if (dndInitDnode(pDnode) != 0) { if (dndInitDnode(pDnode) != 0) {
dError("failed to init dnode"); dError("failed to init dnode");
dndCleanup(pDnode); dndCleanup(pDnode);
@ -222,8 +228,10 @@ void dndCleanup(SDnode *pDnode) {
dndCleanupMnode(pDnode); dndCleanupMnode(pDnode);
dndCleanupVnodes(pDnode); dndCleanupVnodes(pDnode);
dndCleanupDnode(pDnode); dndCleanupDnode(pDnode);
vnodeClear();
walCleanUp(); walCleanUp();
rpcCleanup(); rpcCleanup();
dndCleanupEnv(pDnode); dndCleanupEnv(pDnode);
free(pDnode); free(pDnode);
dInfo("TDengine is cleaned up successfully"); dInfo("TDengine is cleaned up successfully");

View File

@ -176,7 +176,6 @@ SServer* DndTestVgroup::pServer;
SClient* DndTestVgroup::pClient; SClient* DndTestVgroup::pClient;
int32_t DndTestVgroup::connId; int32_t DndTestVgroup::connId;
TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
{ {
SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(sizeof(SCreateVnodeMsg)); SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(sizeof(SCreateVnodeMsg));
@ -184,6 +183,7 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
pReq->dnodeId = htonl(1); pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1"); strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527); pReq->dbUid = htobe64(9527);
pReq->vgVersion = htonl(1);
pReq->cacheBlockSize = htonl(16); pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10); pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10); pReq->daysPerFile = htonl(10);
@ -217,8 +217,6 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
SRpcMsg* pMsg = pClient->pRsp; SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
taosMsleep(1000000); // taosMsleep(1000000);
} }
} }

View File

@ -168,6 +168,7 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb
pCreate->vgId = htonl(pVgroup->vgId); pCreate->vgId = htonl(pVgroup->vgId);
memcpy(pCreate->db, pDb->name, TSDB_FULL_DB_NAME_LEN); memcpy(pCreate->db, pDb->name, TSDB_FULL_DB_NAME_LEN);
pCreate->dbUid = htobe64(pDb->uid); pCreate->dbUid = htobe64(pDb->uid);
pCreate->vgVersion = htonl(pVgroup->version);
pCreate->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize); pCreate->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize);
pCreate->totalBlocks = htonl(pDb->cfg.totalBlocks); pCreate->totalBlocks = htonl(pDb->cfg.totalBlocks);
pCreate->daysPerFile = htonl(pDb->cfg.daysPerFile); pCreate->daysPerFile = htonl(pDb->cfg.daysPerFile);