|
|
|
@ -35,7 +35,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode);
|
|
|
|
|
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
|
|
|
|
|
static int32_t vnodeReadCfg(SVnodeObj *pVnode);
|
|
|
|
|
static int32_t vnodeSaveVersion(SVnodeObj *pVnode);
|
|
|
|
|
static bool vnodeReadVersion(SVnodeObj *pVnode);
|
|
|
|
|
static int32_t vnodeReadVersion(SVnodeObj *pVnode);
|
|
|
|
|
static int vnodeProcessTsdbStatus(void *arg, int status);
|
|
|
|
|
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion);
|
|
|
|
|
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
|
|
|
|
@ -46,9 +46,9 @@ static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
|
|
|
|
|
|
|
|
|
|
#ifndef _SYNC
|
|
|
|
|
tsync_h syncStart(const SSyncInfo *info) { return NULL; }
|
|
|
|
|
int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; }
|
|
|
|
|
int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; }
|
|
|
|
|
void syncStop(tsync_h shandle) {}
|
|
|
|
|
int syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; }
|
|
|
|
|
int32_t syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; }
|
|
|
|
|
int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }
|
|
|
|
|
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {}
|
|
|
|
|
#endif
|
|
|
|
@ -148,35 +148,20 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
|
|
|
|
|
pVnode->status = TAOS_VN_STATUS_UPDATING;
|
|
|
|
|
|
|
|
|
|
int32_t code = vnodeSaveCfg(pVnodeCfg);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
vError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code));
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) return code;
|
|
|
|
|
|
|
|
|
|
code = vnodeReadCfg(pVnode);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
vError("vgId:%d, failed to read cfg file", pVnode->vgId);
|
|
|
|
|
taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) return code;
|
|
|
|
|
|
|
|
|
|
code = syncReconfig(pVnode->sync, &pVnode->syncCfg);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
vTrace("vgId:%d, failed to alter vnode, canot reconfig sync, result:%s", pVnode->vgId,
|
|
|
|
|
tstrerror(code));
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) return code;
|
|
|
|
|
|
|
|
|
|
code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
vTrace("vgId:%d, failed to alter vnode, canot reconfig tsdb, result:%s", pVnode->vgId,
|
|
|
|
|
tstrerror(code));
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) return code;
|
|
|
|
|
|
|
|
|
|
pVnode->status = TAOS_VN_STATUS_READY;
|
|
|
|
|
|
|
|
|
|
vTrace("vgId:%d, vnode is altered", pVnode->vgId);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -185,26 +170,40 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|
|
|
|
pthread_once(&vnodeModuleInit, vnodeInit);
|
|
|
|
|
|
|
|
|
|
SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1);
|
|
|
|
|
if (pVnode == NULL) {
|
|
|
|
|
vError("vgId:%d, failed to open vnode since no enough memory", vnode);
|
|
|
|
|
return TAOS_SYSTEM_ERROR(errno);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
atomic_add_fetch_32(&tsOpennedVnodes, 1);
|
|
|
|
|
atomic_add_fetch_32(&pVnode->refCount, 1);
|
|
|
|
|
|
|
|
|
|
pVnode->vgId = vnode;
|
|
|
|
|
pVnode->status = TAOS_VN_STATUS_INIT;
|
|
|
|
|
pVnode->refCount = 1;
|
|
|
|
|
pVnode->version = 0;
|
|
|
|
|
pVnode->tsdbCfg.tsdbId = pVnode->vgId;
|
|
|
|
|
pVnode->rootDir = strdup(rootDir);
|
|
|
|
|
taosHashPut(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *));
|
|
|
|
|
|
|
|
|
|
int32_t code = vnodeReadCfg(pVnode);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
vError("vgId:%d, failed to read cfg file", pVnode->vgId);
|
|
|
|
|
taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
|
|
|
|
|
vnodeCleanUp(pVnode);
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
code = vnodeReadVersion(pVnode);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
vnodeCleanUp(pVnode);
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
vnodeReadVersion(pVnode);
|
|
|
|
|
pVnode->fversion = pVnode->version;
|
|
|
|
|
|
|
|
|
|
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
|
|
|
|
|
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
|
|
|
|
|
if (pVnode->wqueue == NULL || pVnode->rqueue == NULL) {
|
|
|
|
|
vnodeCleanUp(pVnode);
|
|
|
|
|
return terrno;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SCqCfg cqCfg = {0};
|
|
|
|
|
sprintf(cqCfg.user, "root");
|
|
|
|
@ -212,22 +211,29 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|
|
|
|
cqCfg.vgId = vnode;
|
|
|
|
|
cqCfg.cqWrite = vnodeWriteToQueue;
|
|
|
|
|
pVnode->cq = cqOpen(pVnode, &cqCfg);
|
|
|
|
|
if (pVnode->cq == NULL) {
|
|
|
|
|
vnodeCleanUp(pVnode);
|
|
|
|
|
return terrno;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
STsdbAppH appH = {0};
|
|
|
|
|
appH.appH = (void *)pVnode;
|
|
|
|
|
appH.notifyStatus = vnodeProcessTsdbStatus;
|
|
|
|
|
appH.cqH = pVnode->cq;
|
|
|
|
|
|
|
|
|
|
sprintf(temp, "%s/tsdb", rootDir);
|
|
|
|
|
pVnode->tsdb = tsdbOpenRepo(temp, &appH);
|
|
|
|
|
if (pVnode->tsdb == NULL) {
|
|
|
|
|
vError("vgId:%d, failed to open tsdb at %s(%s)", pVnode->vgId, temp, tstrerror(terrno));
|
|
|
|
|
taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
|
|
|
|
|
vnodeCleanUp(pVnode);
|
|
|
|
|
return terrno;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sprintf(temp, "%s/wal", rootDir);
|
|
|
|
|
pVnode->wal = walOpen(temp, &pVnode->walCfg);
|
|
|
|
|
if (pVnode->wal == NULL) {
|
|
|
|
|
vnodeCleanUp(pVnode);
|
|
|
|
|
return terrno;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
|
|
|
|
|
|
|
|
|
|
SSyncInfo syncInfo;
|
|
|
|
@ -246,6 +252,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|
|
|
|
|
|
|
|
|
#ifndef _SYNC
|
|
|
|
|
pVnode->role = TAOS_SYNC_ROLE_MASTER;
|
|
|
|
|
#else
|
|
|
|
|
if (pVnode->sync == NULL) {
|
|
|
|
|
vnodeCleanUp(pVnode);
|
|
|
|
|
return terrno;
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
// start continuous query
|
|
|
|
@ -253,11 +264,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|
|
|
|
cqStart(pVnode->cq);
|
|
|
|
|
|
|
|
|
|
pVnode->events = NULL;
|
|
|
|
|
|
|
|
|
|
pVnode->status = TAOS_VN_STATUS_READY;
|
|
|
|
|
vTrace("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
|
|
|
|
|
|
|
|
|
|
atomic_add_fetch_32(&tsOpennedVnodes, 1);
|
|
|
|
|
taosHashPut(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *));
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -286,13 +297,6 @@ void vnodeRelease(void *pVnodeRaw) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tfree(pVnode->rootDir);
|
|
|
|
|
// remove read queue
|
|
|
|
|
dnodeFreeRqueue(pVnode->rqueue);
|
|
|
|
|
pVnode->rqueue = NULL;
|
|
|
|
|
|
|
|
|
|
// remove write queue
|
|
|
|
|
dnodeFreeWqueue(pVnode->wqueue);
|
|
|
|
|
pVnode->wqueue = NULL;
|
|
|
|
|
|
|
|
|
|
if (pVnode->status == TAOS_VN_STATUS_DELETING) {
|
|
|
|
|
char rootDir[TSDB_FILENAME_LEN] = {0};
|
|
|
|
@ -387,14 +391,25 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
|
|
|
|
|
pVnode->sync = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cqClose(pVnode->cq);
|
|
|
|
|
pVnode->cq = NULL;
|
|
|
|
|
if (pVnode->wal)
|
|
|
|
|
walClose(pVnode->wal);
|
|
|
|
|
pVnode->wal = NULL;
|
|
|
|
|
|
|
|
|
|
tsdbCloseRepo(pVnode->tsdb, 1);
|
|
|
|
|
if (pVnode->tsdb)
|
|
|
|
|
tsdbCloseRepo(pVnode->tsdb, 1);
|
|
|
|
|
pVnode->tsdb = NULL;
|
|
|
|
|
|
|
|
|
|
walClose(pVnode->wal);
|
|
|
|
|
pVnode->wal = NULL;
|
|
|
|
|
if (pVnode->cq)
|
|
|
|
|
cqClose(pVnode->cq);
|
|
|
|
|
pVnode->cq = NULL;
|
|
|
|
|
|
|
|
|
|
if (pVnode->wqueue)
|
|
|
|
|
dnodeFreeWqueue(pVnode->wqueue);
|
|
|
|
|
pVnode->wqueue = NULL;
|
|
|
|
|
|
|
|
|
|
if (pVnode->rqueue)
|
|
|
|
|
dnodeFreeRqueue(pVnode->rqueue);
|
|
|
|
|
pVnode->rqueue = NULL;
|
|
|
|
|
|
|
|
|
|
vnodeRelease(pVnode);
|
|
|
|
|
}
|
|
|
|
@ -462,7 +477,8 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
|
|
|
|
if (!fp) {
|
|
|
|
|
vError("vgId:%d, failed to open vnode cfg file for write, file:%s error:%s", pVnodeCfg->cfg.vgId, cfgFile,
|
|
|
|
|
strerror(errno));
|
|
|
|
|
return errno;
|
|
|
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
|
|
|
return terrno;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t len = 0;
|
|
|
|
@ -512,27 +528,30 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
|
|
|
|
char cfgFile[TSDB_FILENAME_LEN + 30] = {0};
|
|
|
|
|
cJSON *root = NULL;
|
|
|
|
|
char *content = NULL;
|
|
|
|
|
char cfgFile[TSDB_FILENAME_LEN + 30] = {0};
|
|
|
|
|
int maxLen = 1000;
|
|
|
|
|
|
|
|
|
|
terrno = TSDB_CODE_OTHERS;
|
|
|
|
|
sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnode->vgId);
|
|
|
|
|
FILE *fp = fopen(cfgFile, "r");
|
|
|
|
|
if (!fp) {
|
|
|
|
|
vError("vgId:%d, failed to open vnode cfg file for read, file:%s, error:%s", pVnode->vgId,
|
|
|
|
|
vError("vgId:%d, failed to open vnode cfg file:%s to read, error:%s", pVnode->vgId,
|
|
|
|
|
cfgFile, strerror(errno));
|
|
|
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
|
|
|
goto PARSE_OVER;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
content = calloc(1, maxLen + 1);
|
|
|
|
|
if (content == NULL) goto PARSE_OVER;
|
|
|
|
|
int len = fread(content, 1, maxLen, fp);
|
|
|
|
|
if (len <= 0) {
|
|
|
|
|
vError("vgId:%d, failed to read vnode cfg, content is null", pVnode->vgId);
|
|
|
|
|
return errno;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int ret = TSDB_CODE_OTHERS;
|
|
|
|
|
int maxLen = 1000;
|
|
|
|
|
char *content = calloc(1, maxLen + 1);
|
|
|
|
|
int len = fread(content, 1, maxLen, fp);
|
|
|
|
|
if (len <= 0) {
|
|
|
|
|
free(content);
|
|
|
|
|
fclose(fp);
|
|
|
|
|
vError("vgId:%d, failed to read vnode cfg, content is null", pVnode->vgId);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cJSON *root = cJSON_Parse(content);
|
|
|
|
|
root = cJSON_Parse(content);
|
|
|
|
|
if (root == NULL) {
|
|
|
|
|
vError("vgId:%d, failed to read vnode cfg, invalid json format", pVnode->vgId);
|
|
|
|
|
goto PARSE_OVER;
|
|
|
|
@ -691,19 +710,19 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
|
|
|
|
pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ret = 0;
|
|
|
|
|
terrno = TSDB_CODE_SUCCESS;
|
|
|
|
|
|
|
|
|
|
vPrint("vgId:%d, read vnode cfg successed, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica);
|
|
|
|
|
vPrint("vgId:%d, read vnode cfg successfully, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica);
|
|
|
|
|
for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) {
|
|
|
|
|
vPrint("vgId:%d, dnode:%d, %s:%d", pVnode->vgId, pVnode->syncCfg.nodeInfo[i].nodeId,
|
|
|
|
|
pVnode->syncCfg.nodeInfo[i].nodeFqdn, pVnode->syncCfg.nodeInfo[i].nodePort);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
PARSE_OVER:
|
|
|
|
|
free(content);
|
|
|
|
|
tfree(content);
|
|
|
|
|
cJSON_Delete(root);
|
|
|
|
|
fclose(fp);
|
|
|
|
|
return ret;
|
|
|
|
|
if (fp) fclose(fp);
|
|
|
|
|
return terrno;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
|
|
|
|
@ -713,7 +732,7 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
|
|
|
|
|
if (!fp) {
|
|
|
|
|
vError("vgId:%d, failed to open vnode version file for write, file:%s error:%s", pVnode->vgId,
|
|
|
|
|
versionFile, strerror(errno));
|
|
|
|
|
return errno;
|
|
|
|
|
return TAOS_SYSTEM_ERROR(errno);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t len = 0;
|
|
|
|
@ -733,29 +752,33 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool vnodeReadVersion(SVnodeObj *pVnode) {
|
|
|
|
|
char versionFile[TSDB_FILENAME_LEN + 30] = {0};
|
|
|
|
|
static int32_t vnodeReadVersion(SVnodeObj *pVnode) {
|
|
|
|
|
char versionFile[TSDB_FILENAME_LEN + 30] = {0};
|
|
|
|
|
char *content = NULL;
|
|
|
|
|
cJSON *root = NULL;
|
|
|
|
|
int maxLen = 100;
|
|
|
|
|
|
|
|
|
|
terrno = TSDB_CODE_OTHERS;
|
|
|
|
|
sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
|
|
|
|
|
FILE *fp = fopen(versionFile, "r");
|
|
|
|
|
if (!fp) {
|
|
|
|
|
if (errno != ENOENT) {
|
|
|
|
|
vError("vgId:%d, failed to open version file:%s error:%s", pVnode->vgId, versionFile, strerror(errno));
|
|
|
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
|
|
|
} else {
|
|
|
|
|
terrno = TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
goto PARSE_OVER;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool ret = false;
|
|
|
|
|
int maxLen = 100;
|
|
|
|
|
char *content = calloc(1, maxLen + 1);
|
|
|
|
|
content = calloc(1, maxLen + 1);
|
|
|
|
|
int len = fread(content, 1, maxLen, fp);
|
|
|
|
|
if (len <= 0) {
|
|
|
|
|
free(content);
|
|
|
|
|
fclose(fp);
|
|
|
|
|
vPrint("vgId:%d, failed to read vnode version, content is null", pVnode->vgId);
|
|
|
|
|
return false;
|
|
|
|
|
vError("vgId:%d, failed to read vnode version, content is null", pVnode->vgId);
|
|
|
|
|
goto PARSE_OVER;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cJSON *root = cJSON_Parse(content);
|
|
|
|
|
root = cJSON_Parse(content);
|
|
|
|
|
if (root == NULL) {
|
|
|
|
|
vError("vgId:%d, failed to read vnode version, invalid json format", pVnode->vgId);
|
|
|
|
|
goto PARSE_OVER;
|
|
|
|
@ -768,13 +791,12 @@ static bool vnodeReadVersion(SVnodeObj *pVnode) {
|
|
|
|
|
}
|
|
|
|
|
pVnode->version = version->valueint;
|
|
|
|
|
|
|
|
|
|
ret = true;
|
|
|
|
|
|
|
|
|
|
vPrint("vgId:%d, read vnode version succeed, version:%" PRId64, pVnode->vgId, pVnode->version);
|
|
|
|
|
terrno = TSDB_CODE_SUCCESS;
|
|
|
|
|
vPrint("vgId:%d, read vnode version successfully, version:%" PRId64, pVnode->vgId, pVnode->version);
|
|
|
|
|
|
|
|
|
|
PARSE_OVER:
|
|
|
|
|
free(content);
|
|
|
|
|
tfree(content);
|
|
|
|
|
cJSON_Delete(root);
|
|
|
|
|
fclose(fp);
|
|
|
|
|
return ret;
|
|
|
|
|
if(fp) fclose(fp);
|
|
|
|
|
return terrno;
|
|
|
|
|
}
|
|
|
|
|