diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index f9904f7fa8..9cb47d367b 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -33,6 +33,8 @@ #include "tcq.h" //#include "tsync.h" +#define TSDB_VNODE_VERSION_CONTENT_LEN 31 + static int32_t tsOpennedVnodes; static void *tsDnodeVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); @@ -41,7 +43,7 @@ static int vnodeWalCallback(void *arg); static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); static int32_t vnodeReadCfg(SVnodeObj *pVnode); static int32_t vnodeSaveVersion(SVnodeObj *pVnode); -static bool vnodeReadVersion(SVnodeObj *pVnode); +static int32_t vnodeReadVersion(SVnodeObj *pVnode); static int vnodeWalCallback(void *arg); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); @@ -108,7 +110,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock; tsdbCfg.precision = pVnodeCfg->cfg.precision; tsdbCfg.compression = pVnodeCfg->cfg.compression;; - + char tsdbDir[TSDB_FILENAME_LEN] = {0}; sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL); @@ -134,7 +136,7 @@ int32_t vnodeDrop(int32_t vgId) { dTrace("pVnode:%p vgId:%d, vnode will be dropped", pVnode, pVnode->vgId); pVnode->status = TAOS_VN_STATUS_DELETING; vnodeCleanUp(pVnode); - + return TSDB_CODE_SUCCESS; } @@ -177,26 +179,46 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { int32_t vnodeOpen(int32_t vnode, char *rootDir) { char temp[TSDB_FILENAME_LEN]; + int32_t code; pthread_once(&vnodeModuleInit, vnodeInit); SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1); + if (pVnode == NULL) { + code = TSDB_CODE_NO_RESOURCE; + goto vnodeOpenError; + } + pVnode->vgId = vnode; pVnode->status = TAOS_VN_STATUS_INIT; pVnode->refCount = 1; - pVnode->version = 0; + pVnode->version = 0; taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode)); - - int32_t code = vnodeReadCfg(pVnode); + + code = vnodeReadCfg(pVnode); if (code != TSDB_CODE_SUCCESS) { dError("pVnode:%p vgId:%d, failed to read cfg file", pVnode, pVnode->vgId); - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); - return code; + goto vnodeOpenError; + } + + code = vnodeReadVersion(pVnode); + if (code != TSDB_CODE_SUCCESS) { + dError("pVnode:%p vgId:%d, failed to read version file", pVnode, pVnode->vgId); + goto vnodeOpenError; } - vnodeReadVersion(pVnode); - pVnode->wqueue = dnodeAllocateWqueue(pVnode); + if (pVnode->wqueue == NULL) { + dError("pVnode:%p vgId:%d, failed to allocate Wqueue", pVnode, pVnode->vgId); + code = TSDB_CODE_NO_RESOURCE; + goto vnodeOpenError; + } + pVnode->rqueue = dnodeAllocateRqueue(pVnode); + if (pVnode->wqueue == NULL) { + dError("pVnode:%p vgId:%d, failed to allocate Rqueue", pVnode, pVnode->vgId); + code = TSDB_CODE_NO_RESOURCE; + goto vnodeOpenError; + } SCqCfg cqCfg = {0}; sprintf(cqCfg.user, "root"); @@ -214,8 +236,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->tsdb = tsdbOpenRepo(temp, &appH); if (pVnode->tsdb == NULL) { dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno)); - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); - return terrno; + code = TSDB_CODE_VG_INIT_FAILED; + goto vnodeOpenError; } sprintf(temp, "%s/wal", rootDir); @@ -231,12 +253,12 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.getWalInfo = vnodeGetWalInfo; syncInfo.getFileInfo = vnodeGetFileInfo; syncInfo.writeToCache = vnodeWriteToQueue; - syncInfo.confirmForward = dnodeSendRpcWriteRsp; + syncInfo.confirmForward = dnodeSendRpcWriteRsp; syncInfo.notifyRole = vnodeNotifyRole; pVnode->sync = syncStart(&syncInfo); // start continuous query - if (pVnode->role == TAOS_SYNC_ROLE_MASTER) + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) cqStart(pVnode->cq); pVnode->events = NULL; @@ -246,6 +268,18 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { atomic_add_fetch_32(&tsOpennedVnodes, 1); return TSDB_CODE_SUCCESS; + +vnodeOpenError: + if (pVnode != NULL && pVnode->wqueue != NULL) { + dnodeFreeWqueue(pVnode->wqueue); + } + if (pVnode != NULL && pVnode->rqueue != NULL) { + dnodeFreeRqueue(pVnode->rqueue); + } + if (pVnode != NULL) { + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + } + return code; } int32_t vnodeClose(int32_t vgId) { @@ -320,7 +354,7 @@ void *vnodeAccquireVnode(int32_t vgId) { } void *vnodeGetRqueue(void *pVnode) { - return ((SVnodeObj *)pVnode)->rqueue; + return ((SVnodeObj *)pVnode)->rqueue; } void *vnodeGetWqueue(int32_t vgId) { @@ -330,7 +364,7 @@ void *vnodeGetWqueue(int32_t vgId) { } void *vnodeGetWal(void *pVnode) { - return ((SVnodeObj *)pVnode)->wal; + return ((SVnodeObj *)pVnode)->wal; } void vnodeBuildStatusMsg(void *param) { @@ -398,9 +432,9 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { SVnodeObj *pVnode = ahandle; pVnode->role = role; - if (pVnode->role == TAOS_SYNC_ROLE_MASTER) + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) cqStart(pVnode->cq); - else + else cqStop(pVnode->cq); } @@ -411,12 +445,16 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { if (!fp) { dError("vgId:%d, failed to open vnode cfg file for write, file:%s error:%s", pVnodeCfg->cfg.vgId, cfgFile, strerror(errno)); - return errno; + return TSDB_CODE_OTHERS; } int32_t len = 0; int32_t maxLen = 1000; char * content = calloc(1, maxLen + 1); + if (content == NULL) { + fclose(fp); + return TSDB_CODE_NO_RESOURCE; + } len += snprintf(content + len, maxLen - len, "{\n"); @@ -430,14 +468,14 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2); len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock); len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock); - len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime); + len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime); len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision); len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression); len += snprintf(content + len, maxLen - len, " \"commitLog\": %d,\n", pVnodeCfg->cfg.commitLog); len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications); len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals); len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum); - + len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) { len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId); @@ -457,7 +495,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { dPrint("vgId:%d, save vnode cfg successed", pVnodeCfg->cfg.vgId); - return 0; + return TSDB_CODE_SUCCESS; } static int32_t vnodeReadCfg(SVnodeObj *pVnode) { @@ -467,12 +505,17 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { if (!fp) { dError("pVnode:%p vgId:%d, failed to open vnode cfg file for read, file:%s, error:%s", pVnode, pVnode->vgId, cfgFile, strerror(errno)); - return errno; + return TSDB_CODE_OTHERS; } int ret = TSDB_CODE_OTHERS; int maxLen = 1000; char *content = calloc(1, maxLen + 1); + if (content == NULL) { + fclose(fp); + return TSDB_CODE_NO_RESOURCE; + } + int len = fread(content, 1, maxLen, fp); if (len <= 0) { free(content); @@ -640,7 +683,7 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC; } - ret = 0; + ret = TSDB_CODE_SUCCESS; dPrint("pVnode:%p vgId:%d, read vnode cfg successed, replcia:%d", pVnode, pVnode->vgId, pVnode->syncCfg.replica); for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) { @@ -662,12 +705,12 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { if (!fp) { dError("pVnode:%p vgId:%d, failed to open vnode version file for write, file:%s error:%s", pVnode, pVnode->vgId, versionFile, strerror(errno)); - return errno; + return TSDB_CODE_OTHERS; } int32_t len = 0; int32_t maxLen = 30; - char * content = calloc(1, maxLen + 1); + char content[TSDB_VNODE_VERSION_CONTENT_LEN] = {0}; len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->version); @@ -675,32 +718,31 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { fwrite(content, 1, len, fp); fclose(fp); - free(content); dPrint("pVnode:%p vgId:%d, save vnode version:%" PRId64 " successed", pVnode, pVnode->vgId, pVnode->version); - return 0; + return TSDB_CODE_SUCCESS; } -static bool vnodeReadVersion(SVnodeObj *pVnode) { +static int32_t vnodeReadVersion(SVnodeObj *pVnode) { char versionFile[TSDB_FILENAME_LEN + 30] = {0}; sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId); FILE *fp = fopen(versionFile, "r"); if (!fp) { dTrace("pVnode:%p vgId:%d, failed to open version file:%s error:%s", pVnode, pVnode->vgId, versionFile, strerror(errno)); - return false; + return TSDB_CODE_OTHERS; } bool ret = false; int maxLen = 100; - char *content = calloc(1, maxLen + 1); + char content[TSDB_VNODE_VERSION_CONTENT_LEN] = {0}; + int len = fread(content, 1, maxLen, fp); if (len <= 0) { - free(content); fclose(fp); dPrint("pVnode:%p vgId:%d, failed to read vnode version, content is null", pVnode, pVnode->vgId); - return false; + return TSDB_CODE_OTHERS; } cJSON *root = cJSON_Parse(content); @@ -716,12 +758,11 @@ static bool vnodeReadVersion(SVnodeObj *pVnode) { } pVnode->version = version->valueint; - ret = true; + ret = TSDB_CODE_SUCCESS; dPrint("pVnode:%p vgId:%d, read vnode version successed, version:%%" PRId64, pVnode, pVnode->vgId, pVnode->version); PARSE_OVER: - free(content); cJSON_Delete(root); fclose(fp); return ret;