diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 7935bb7ff5..5cc3ce0159 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -18,6 +18,7 @@ #include #include #include +#include #include "taosdef.h" #include "taosmsg.h" #include "tglobal.h" @@ -64,7 +65,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); void *cqOpen(void *ahandle, const SCqCfg *pCfg) { SCqContext *pContext = calloc(sizeof(SCqContext), 1); - if (pContext == NULL) return NULL; + if (pContext == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } strcpy(pContext->user, pCfg->user); strcpy(pContext->pass, pCfg->pass); @@ -82,6 +86,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { void cqClose(void *handle) { SCqContext *pContext = handle; + if (handle == NULL) return; // stop all CQs cqStop(pContext); @@ -106,9 +111,9 @@ void cqClose(void *handle) { void cqStart(void *handle) { SCqContext *pContext = handle; - cTrace("vgId:%d, start all CQs", pContext->vgId); if (pContext->dbConn || pContext->master) return; + cTrace("vgId:%d, start all CQs", pContext->vgId); pthread_mutex_lock(&pContext->mutex); pContext->master = 1; diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 05d1d93cf6..fcf26d22c3 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -94,8 +94,8 @@ typedef void* tsync_h; tsync_h syncStart(const SSyncInfo *); void syncStop(tsync_h shandle); -int syncReconfig(tsync_h shandle, const SSyncCfg *); -int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype); +int32_t syncReconfig(tsync_h shandle, const SSyncCfg *); +int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype); void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code); void syncRecover(tsync_h shandle); // recover from other nodes: int syncGetNodesRole(tsync_h shandle, SNodesRole *); diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index 1e248c9e45..475941dbdb 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -65,6 +65,7 @@ taos_queue taosOpenQueue() { } void taosCloseQueue(taos_queue param) { + if (param == NULL) return; STaosQueue *queue = (STaosQueue *)param; STaosQnode *pTemp; STaosQnode *pNode = queue->head; @@ -224,6 +225,7 @@ taos_qset taosOpenQset() { } void taosCloseQset(taos_qset param) { + if (param == NULL) return; STaosQset *qset = (STaosQset *)param; pthread_mutex_destroy(&qset->mutex); tsem_destroy(&qset->sem); diff --git a/src/util/src/ttimer.c b/src/util/src/ttimer.c index 42fd13b2cd..e6ef73ef57 100644 --- a/src/util/src/ttimer.c +++ b/src/util/src/ttimer.c @@ -405,19 +405,19 @@ static bool doStopTimer(tmr_obj_t* timer, uint8_t state) { tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param); return reusable; } - + if (state != TIMER_STATE_EXPIRED) { // timer already stopped or cancelled, has nothing to do in this case return false; } - + if (timer->executedBy == taosGetPthreadId()) { // taosTmrReset is called in the timer callback, should do nothing in this // case to avoid dead lock. note taosTmrReset must be the last statement // of the callback funtion, will be a bug otherwise. return false; } - + // timer callback is executing in another thread, we SHOULD wait it to stop, // BUT this may result in dead lock if current thread are holding a lock which // the timer callback need to acquire. so, we HAVE TO return directly. @@ -501,6 +501,7 @@ static void taosTmrModuleInit(void) { tmr_ctrl_t* ctrl = tmrCtrls + i; ctrl->next = ctrl + 1; } + (tmrCtrls + taosMaxTmrCtrl - 1)->next = NULL; unusedTmrCtrl = tmrCtrls; pthread_mutex_init(&tmrCtrlMutex, NULL); @@ -574,12 +575,12 @@ void taosTmrCleanUp(void* handle) { if (numOfTmrCtrl <=0) { taosUninitTimer(); - + taosCleanUpScheduler(tmrQhandle); for (int i = 0; i < tListLen(wheels); i++) { time_wheel_t* wheel = wheels + i; - pthread_mutex_destroy(&wheel->mutex); + pthread_mutex_destroy(&wheel->mutex); free(wheel->slots); } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 6f0b19b0c6..b8bc29550e 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -35,7 +35,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode); static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); static int32_t vnodeReadCfg(SVnodeObj *pVnode); static int32_t vnodeSaveVersion(SVnodeObj *pVnode); -static bool vnodeReadVersion(SVnodeObj *pVnode); +static int32_t vnodeReadVersion(SVnodeObj *pVnode); static int vnodeProcessTsdbStatus(void *arg, int status); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); @@ -46,9 +46,9 @@ static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; #ifndef _SYNC tsync_h syncStart(const SSyncInfo *info) { return NULL; } -int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; } +int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; } void syncStop(tsync_h shandle) {} -int syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; } +int32_t syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; } int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; } void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {} #endif @@ -148,35 +148,20 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { pVnode->status = TAOS_VN_STATUS_UPDATING; int32_t code = vnodeSaveCfg(pVnodeCfg); - if (code != TSDB_CODE_SUCCESS) { - vError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code)); - return code; - } + if (code != TSDB_CODE_SUCCESS) return code; code = vnodeReadCfg(pVnode); - if (code != TSDB_CODE_SUCCESS) { - vError("vgId:%d, failed to read cfg file", pVnode->vgId); - taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); - return code; - } + if (code != TSDB_CODE_SUCCESS) return code; code = syncReconfig(pVnode->sync, &pVnode->syncCfg); - if (code != TSDB_CODE_SUCCESS) { - vTrace("vgId:%d, failed to alter vnode, canot reconfig sync, result:%s", pVnode->vgId, - tstrerror(code)); - return code; - } + if (code != TSDB_CODE_SUCCESS) return code; code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg); - if (code != TSDB_CODE_SUCCESS) { - vTrace("vgId:%d, failed to alter vnode, canot reconfig tsdb, result:%s", pVnode->vgId, - tstrerror(code)); - return code; - } + if (code != TSDB_CODE_SUCCESS) return code; pVnode->status = TAOS_VN_STATUS_READY; - vTrace("vgId:%d, vnode is altered", pVnode->vgId); + return TSDB_CODE_SUCCESS; } @@ -185,26 +170,40 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pthread_once(&vnodeModuleInit, vnodeInit); SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1); + if (pVnode == NULL) { + vError("vgId:%d, failed to open vnode since no enough memory", vnode); + return TAOS_SYSTEM_ERROR(errno); + } + + atomic_add_fetch_32(&tsOpennedVnodes, 1); + atomic_add_fetch_32(&pVnode->refCount, 1); + pVnode->vgId = vnode; pVnode->status = TAOS_VN_STATUS_INIT; - pVnode->refCount = 1; pVnode->version = 0; pVnode->tsdbCfg.tsdbId = pVnode->vgId; pVnode->rootDir = strdup(rootDir); - taosHashPut(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *)); int32_t code = vnodeReadCfg(pVnode); if (code != TSDB_CODE_SUCCESS) { - vError("vgId:%d, failed to read cfg file", pVnode->vgId); - taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); + vnodeCleanUp(pVnode); + return code; + } + + code = vnodeReadVersion(pVnode); + if (code != TSDB_CODE_SUCCESS) { + vnodeCleanUp(pVnode); return code; } - vnodeReadVersion(pVnode); pVnode->fversion = pVnode->version; pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode); + if (pVnode->wqueue == NULL || pVnode->rqueue == NULL) { + vnodeCleanUp(pVnode); + return terrno; + } SCqCfg cqCfg = {0}; sprintf(cqCfg.user, "root"); @@ -212,22 +211,29 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { cqCfg.vgId = vnode; cqCfg.cqWrite = vnodeWriteToQueue; pVnode->cq = cqOpen(pVnode, &cqCfg); + if (pVnode->cq == NULL) { + vnodeCleanUp(pVnode); + return terrno; + } STsdbAppH appH = {0}; appH.appH = (void *)pVnode; appH.notifyStatus = vnodeProcessTsdbStatus; appH.cqH = pVnode->cq; - sprintf(temp, "%s/tsdb", rootDir); pVnode->tsdb = tsdbOpenRepo(temp, &appH); if (pVnode->tsdb == NULL) { - vError("vgId:%d, failed to open tsdb at %s(%s)", pVnode->vgId, temp, tstrerror(terrno)); - taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); + vnodeCleanUp(pVnode); return terrno; } sprintf(temp, "%s/wal", rootDir); pVnode->wal = walOpen(temp, &pVnode->walCfg); + if (pVnode->wal == NULL) { + vnodeCleanUp(pVnode); + return terrno; + } + walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); SSyncInfo syncInfo; @@ -246,6 +252,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { #ifndef _SYNC pVnode->role = TAOS_SYNC_ROLE_MASTER; +#else + if (pVnode->sync == NULL) { + vnodeCleanUp(pVnode); + return terrno; + } #endif // start continuous query @@ -253,11 +264,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { cqStart(pVnode->cq); pVnode->events = NULL; - pVnode->status = TAOS_VN_STATUS_READY; vTrace("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); - atomic_add_fetch_32(&tsOpennedVnodes, 1); + taosHashPut(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *)); + return TSDB_CODE_SUCCESS; } @@ -286,13 +297,6 @@ void vnodeRelease(void *pVnodeRaw) { } tfree(pVnode->rootDir); - // remove read queue - dnodeFreeRqueue(pVnode->rqueue); - pVnode->rqueue = NULL; - - // remove write queue - dnodeFreeWqueue(pVnode->wqueue); - pVnode->wqueue = NULL; if (pVnode->status == TAOS_VN_STATUS_DELETING) { char rootDir[TSDB_FILENAME_LEN] = {0}; @@ -387,15 +391,26 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { pVnode->sync = NULL; } - cqClose(pVnode->cq); - pVnode->cq = NULL; - - tsdbCloseRepo(pVnode->tsdb, 1); - pVnode->tsdb = NULL; - - walClose(pVnode->wal); + if (pVnode->wal) + walClose(pVnode->wal); pVnode->wal = NULL; + if (pVnode->tsdb) + tsdbCloseRepo(pVnode->tsdb, 1); + pVnode->tsdb = NULL; + + if (pVnode->cq) + cqClose(pVnode->cq); + pVnode->cq = NULL; + + if (pVnode->wqueue) + dnodeFreeWqueue(pVnode->wqueue); + pVnode->wqueue = NULL; + + if (pVnode->rqueue) + dnodeFreeRqueue(pVnode->rqueue); + pVnode->rqueue = NULL; + vnodeRelease(pVnode); } @@ -462,7 +477,8 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { if (!fp) { vError("vgId:%d, failed to open vnode cfg file for write, file:%s error:%s", pVnodeCfg->cfg.vgId, cfgFile, strerror(errno)); - return errno; + terrno = TAOS_SYSTEM_ERROR(errno); + return terrno; } int32_t len = 0; @@ -512,27 +528,30 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { } static int32_t vnodeReadCfg(SVnodeObj *pVnode) { - char cfgFile[TSDB_FILENAME_LEN + 30] = {0}; + cJSON *root = NULL; + char *content = NULL; + char cfgFile[TSDB_FILENAME_LEN + 30] = {0}; + int maxLen = 1000; + + terrno = TSDB_CODE_OTHERS; sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnode->vgId); FILE *fp = fopen(cfgFile, "r"); if (!fp) { - vError("vgId:%d, failed to open vnode cfg file for read, file:%s, error:%s", pVnode->vgId, + vError("vgId:%d, failed to open vnode cfg file:%s to read, error:%s", pVnode->vgId, cfgFile, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto PARSE_OVER; + } + + content = calloc(1, maxLen + 1); + if (content == NULL) goto PARSE_OVER; + int len = fread(content, 1, maxLen, fp); + if (len <= 0) { + vError("vgId:%d, failed to read vnode cfg, content is null", pVnode->vgId); return errno; } - int ret = TSDB_CODE_OTHERS; - int maxLen = 1000; - char *content = calloc(1, maxLen + 1); - int len = fread(content, 1, maxLen, fp); - if (len <= 0) { - free(content); - fclose(fp); - vError("vgId:%d, failed to read vnode cfg, content is null", pVnode->vgId); - return false; - } - - cJSON *root = cJSON_Parse(content); + root = cJSON_Parse(content); if (root == NULL) { vError("vgId:%d, failed to read vnode cfg, invalid json format", pVnode->vgId); goto PARSE_OVER; @@ -691,19 +710,19 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC; } - ret = 0; + terrno = TSDB_CODE_SUCCESS; - vPrint("vgId:%d, read vnode cfg successed, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica); + vPrint("vgId:%d, read vnode cfg successfully, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica); for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) { vPrint("vgId:%d, dnode:%d, %s:%d", pVnode->vgId, pVnode->syncCfg.nodeInfo[i].nodeId, pVnode->syncCfg.nodeInfo[i].nodeFqdn, pVnode->syncCfg.nodeInfo[i].nodePort); } PARSE_OVER: - free(content); + tfree(content); cJSON_Delete(root); - fclose(fp); - return ret; + if (fp) fclose(fp); + return terrno; } static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { @@ -713,7 +732,7 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { if (!fp) { vError("vgId:%d, failed to open vnode version file for write, file:%s error:%s", pVnode->vgId, versionFile, strerror(errno)); - return errno; + return TAOS_SYSTEM_ERROR(errno); } int32_t len = 0; @@ -733,29 +752,33 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { return 0; } -static bool vnodeReadVersion(SVnodeObj *pVnode) { - char versionFile[TSDB_FILENAME_LEN + 30] = {0}; +static int32_t vnodeReadVersion(SVnodeObj *pVnode) { + char versionFile[TSDB_FILENAME_LEN + 30] = {0}; + char *content = NULL; + cJSON *root = NULL; + int maxLen = 100; + + terrno = TSDB_CODE_OTHERS; sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId); FILE *fp = fopen(versionFile, "r"); if (!fp) { if (errno != ENOENT) { vError("vgId:%d, failed to open version file:%s error:%s", pVnode->vgId, versionFile, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + } else { + terrno = TSDB_CODE_SUCCESS; } - return false; + goto PARSE_OVER; } - bool ret = false; - int maxLen = 100; - char *content = calloc(1, maxLen + 1); + content = calloc(1, maxLen + 1); int len = fread(content, 1, maxLen, fp); if (len <= 0) { - free(content); - fclose(fp); - vPrint("vgId:%d, failed to read vnode version, content is null", pVnode->vgId); - return false; + vError("vgId:%d, failed to read vnode version, content is null", pVnode->vgId); + goto PARSE_OVER; } - cJSON *root = cJSON_Parse(content); + root = cJSON_Parse(content); if (root == NULL) { vError("vgId:%d, failed to read vnode version, invalid json format", pVnode->vgId); goto PARSE_OVER; @@ -768,13 +791,12 @@ static bool vnodeReadVersion(SVnodeObj *pVnode) { } pVnode->version = version->valueint; - ret = true; - - vPrint("vgId:%d, read vnode version succeed, version:%" PRId64, pVnode->vgId, pVnode->version); + terrno = TSDB_CODE_SUCCESS; + vPrint("vgId:%d, read vnode version successfully, version:%" PRId64, pVnode->vgId, pVnode->version); PARSE_OVER: - free(content); + tfree(content); cJSON_Delete(root); - fclose(fp); - return ret; + if(fp) fclose(fp); + return terrno; } diff --git a/src/wal/src/walMain.c b/src/wal/src/walMain.c index 8d92fac926..ebfc9d98bb 100644 --- a/src/wal/src/walMain.c +++ b/src/wal/src/walMain.c @@ -25,6 +25,7 @@ #include "tlog.h" #include "tchecksum.h" #include "tutil.h" +#include "taoserror.h" #include "twal.h" #include "tqueue.h" @@ -56,7 +57,10 @@ static int walRemoveWalFiles(const char *path); void *walOpen(const char *path, const SWalCfg *pCfg) { SWal *pWal = calloc(sizeof(SWal), 1); - if (pWal == NULL) return NULL; + if (pWal == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } pWal->fd = -1; pWal->max = pCfg->wals; @@ -75,6 +79,7 @@ void *walOpen(const char *path, const SWalCfg *pCfg) { walRenew(pWal); if (pWal->fd <0) { + terrno = TAOS_SYSTEM_ERROR(errno); wError("wal:%s, failed to open", path); pthread_mutex_destroy(&pWal->mutex); free(pWal); @@ -112,9 +117,10 @@ void walClose(void *handle) { } int walRenew(void *handle) { + if (handle == NULL) return 0; SWal *pWal = handle; int code = 0; - + pthread_mutex_lock(&pWal->mutex); if (pWal->fd >=0) { @@ -156,6 +162,7 @@ int walRenew(void *handle) { int walWrite(void *handle, SWalHead *pHead) { SWal *pWal = handle; int code = 0; + if (pWal == NULL) return -1; // no wal if (pWal->level == TAOS_WAL_NOLOG) return 0; @@ -178,6 +185,7 @@ int walWrite(void *handle, SWalHead *pHead) { void walFsync(void *handle) { SWal *pWal = handle; + if (pWal == NULL) return; if (pWal->level == TAOS_WAL_FSYNC && pWal->fd >=0) { if (fsync(pWal->fd) < 0) {