From 6ce5320750ac3d373819668ba875e006955d33fe Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 18 Jul 2024 14:40:49 +0800 Subject: [PATCH] wal/mgmg: use new return macros for wal mgmt --- source/dnode/vnode/src/tq/tqMeta.c | 88 +++++++++++++++--------------- source/libs/wal/src/walMeta.c | 29 +++++----- source/libs/wal/src/walMgmt.c | 29 ++++++---- 3 files changed, 78 insertions(+), 68 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 02184f1d50..b71fd1518a 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -37,7 +37,7 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) { } } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1; - if (pHandle->execHandle.execTb.qmsg != NULL){ + if (pHandle->execHandle.execTb.qmsg != NULL) { if (tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg) < 0) return -1; } } @@ -67,7 +67,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { } } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1; - if (!tDecodeIsEnd(pDecoder)){ + if (!tDecodeIsEnd(pDecoder)) { if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg) < 0) return -1; } } @@ -187,9 +187,9 @@ END: } int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { - int32_t code; - int32_t vlen; - void* buf = NULL; + int32_t code; + int32_t vlen; + void* buf = NULL; SEncoder encoder; tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code); if (code < 0) { @@ -205,7 +205,6 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { goto end; } - tEncoderInit(&encoder, buf, vlen); code = tEncodeSTqHandle(&encoder, pHandle); @@ -258,7 +257,7 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { return 0; } -static int buildHandle(STQ* pTq, STqHandle* handle){ +static int buildHandle(STQ* pTq, STqHandle* handle) { SVnode* pVnode = pTq->pVnode; int32_t vgId = TD_VID(pVnode); @@ -266,7 +265,8 @@ static int buildHandle(STQ* pTq, STqHandle* handle){ if (handle->pRef == NULL) { return -1; } - walSetRefVer(handle->pRef, handle->snapshotVer); + + TAOS_CHECK_RETURN(walSetRefVer(handle->pRef, handle->snapshotVer)); SReadHandle reader = { .vnode = pVnode, @@ -278,8 +278,8 @@ static int buildHandle(STQ* pTq, STqHandle* handle){ initStorageAPI(&reader.api); if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - handle->execHandle.task = - qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId, &handle->execHandle.numOfCols, handle->consumerId); + handle->execHandle.task = qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId, + &handle->execHandle.numOfCols, handle->consumerId); if (handle->execHandle.task == NULL) { tqError("cannot create exec task for %s", handle->subKey); return -1; @@ -305,7 +305,7 @@ static int buildHandle(STQ* pTq, STqHandle* handle){ } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0); - if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) { + if (handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) { if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) { tqError("nodesStringToNode error in sub stable, since %s", terrstr()); return -1; @@ -316,13 +316,15 @@ static int buildHandle(STQ* pTq, STqHandle* handle){ handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId); SArray* tbUidList = NULL; - int ret = qGetTableList(handle->execHandle.execTb.suid, pVnode, handle->execHandle.execTb.node, &tbUidList, handle->execHandle.task); - if(ret != TDB_CODE_SUCCESS) { + int ret = qGetTableList(handle->execHandle.execTb.suid, pVnode, handle->execHandle.execTb.node, &tbUidList, + handle->execHandle.task); + if (ret != TDB_CODE_SUCCESS) { tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle->subKey, handle->consumerId); taosArrayDestroy(tbUidList); return -1; } - tqInfo("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, handle->execHandle.execTb.suid); + tqInfo("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, + handle->execHandle.execTb.suid); handle->execHandle.pTqReader = tqReaderOpen(pVnode); tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL); taosArrayDestroy(tbUidList); @@ -330,10 +332,10 @@ static int buildHandle(STQ* pTq, STqHandle* handle){ return 0; } -static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){ +static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle) { int32_t vgId = TD_VID(pTq->pVnode); SDecoder decoder; - int32_t code = 0; + int32_t code = 0; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); code = tDecodeSTqHandle(&decoder, handle); if (code) goto end; @@ -347,34 +349,35 @@ end: return code; } -int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ - int32_t vgId = TD_VID(pTq->pVnode); +int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) { + int32_t vgId = TD_VID(pTq->pVnode); memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN); handle->consumerId = req->newConsumerId; handle->execHandle.subType = req->subType; handle->fetchMeta = req->withMeta; - if(req->subType == TOPIC_SUB_TYPE__COLUMN){ + if (req->subType == TOPIC_SUB_TYPE__COLUMN) { handle->execHandle.execCol.qmsg = taosStrdup(req->qmsg); - }else if(req->subType == TOPIC_SUB_TYPE__DB){ + } else if (req->subType == TOPIC_SUB_TYPE__DB) { handle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - }else if(req->subType == TOPIC_SUB_TYPE__TABLE){ + } else if (req->subType == TOPIC_SUB_TYPE__TABLE) { handle->execHandle.execTb.suid = req->suid; handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg); } handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal); - if(buildHandle(pTq, handle) < 0){ + if (buildHandle(pTq, handle) < 0) { return -1; } - tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey, handle->consumerId, vgId, handle->snapshotVer); + tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey, handle->consumerId, + vgId, handle->snapshotVer); return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); } -static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pExecStoreOld, TTB* pExecStoreNew){ +static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pExecStoreOld, TTB* pExecStoreNew) { TBC* pCur = NULL; if (tdbTbcOpen(pExecStoreOld, &pCur, NULL) < 0) { return -1; @@ -385,10 +388,10 @@ static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pExecStoreOld, TTB* pExecS return -1; } - void* pKey = NULL; - int kLen = 0; - void* pVal = NULL; - int vLen = 0; + void* pKey = NULL; + int kLen = 0; + void* pVal = NULL; + int vLen = 0; tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { @@ -419,10 +422,10 @@ int32_t tqMetaTransform(STQ* pTq) { char* maindb = taosMemoryCalloc(1, len); sprintf(maindb, "%s%s%s", pTq->path, TD_DIRSEP, TDB_MAINDB_NAME); - if(!taosCheckExistFile(maindb)){ + if (!taosCheckExistFile(maindb)) { taosMemoryFree(maindb); char* tpath = taosMemoryCalloc(1, len); - if(tpath == NULL){ + if (tpath == NULL) { return -1; } sprintf(tpath, "%s%s%s", pTq->path, TD_DIRSEP, "subscribe"); @@ -437,12 +440,11 @@ int32_t tqMetaTransform(STQ* pTq) { TTB* pCheckStore = NULL; char* offsetNew = NULL; char* offset = tqOffsetBuildFName(pTq->path, 0); - if(offset == NULL){ + if (offset == NULL) { code = -1; goto END; } - if (tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, 0, NULL) < 0) { code = -1; goto END; @@ -458,8 +460,8 @@ int32_t tqMetaTransform(STQ* pTq) { goto END; } - char* tpath = taosMemoryCalloc(1, len); - if(tpath == NULL){ + char* tpath = taosMemoryCalloc(1, len); + if (tpath == NULL) { code = -1; goto END; } @@ -471,12 +473,12 @@ int32_t tqMetaTransform(STQ* pTq) { goto END; } - if( tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore) < 0){ + if (tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore) < 0) { code = -1; goto END; } - if(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore) < 0){ + if (tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore) < 0) { code = -1; goto END; } @@ -489,11 +491,11 @@ int32_t tqMetaTransform(STQ* pTq) { pMetaDB = NULL; offsetNew = tqOffsetBuildFName(pTq->path, 0); - if(offsetNew == NULL){ + if (offsetNew == NULL) { code = -1; goto END; } - if(taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0){ + if (taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0) { tqError("copy offset file error"); code = -1; goto END; @@ -502,7 +504,7 @@ int32_t tqMetaTransform(STQ* pTq) { taosRemoveFile(maindb); taosRemoveFile(offset); - END: +END: taosMemoryFree(maindb); taosMemoryFree(offset); taosMemoryFree(offsetNew); @@ -515,15 +517,15 @@ int32_t tqMetaTransform(STQ* pTq) { } int32_t tqMetaGetHandle(STQ* pTq, const char* key) { - void* pVal = NULL; - int vLen = 0; + void* pVal = NULL; + int vLen = 0; if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &pVal, &vLen) < 0) { return -1; } STqHandle handle = {0}; - int code = restoreHandle(pTq, pVal, vLen, &handle); - if (code < 0){ + int code = restoreHandle(pTq, pVal, vLen, &handle); + if (code < 0) { tqDestroyTqHandle(&handle); } tdbFree(pVal); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 39ec80db87..bbffa7180e 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -873,6 +873,7 @@ static void walUpdateSyncedOffset(SWal* pWal) { } int walSaveMeta(SWal* pWal) { + int code = 0; int metaVer = walFindCurMetaVer(pWal); char fnameStr[WAL_FILE_LEN]; char tmpFnameStr[WAL_FILE_LEN]; @@ -881,14 +882,14 @@ int walSaveMeta(SWal* pWal) { // fsync the idx and log file at first to ensure validity of meta if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pWal->pIdxFile) < 0) { wError("vgId:%d, failed to sync idx file due to %s", pWal->cfg.vgId, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pWal->pLogFile) < 0) { wError("vgId:%d, failed to sync log file due to %s", pWal->cfg.vgId, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } // update synced offset @@ -897,46 +898,47 @@ int walSaveMeta(SWal* pWal) { // flush to a tmpfile n = walBuildTmpMetaName(pWal, tmpFnameStr); if (n >= sizeof(tmpFnameStr)) { - return -1; + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } TdFilePtr pMetaFile = taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); if (pMetaFile == NULL) { wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } char* serialized = walMetaSerialize(pWal); int len = strlen(serialized); if (pWal->cfg.level != TAOS_WAL_SKIP && len != taosWriteFile(pMetaFile, serialized, len)) { wError("vgId:%d, failed to write file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr); - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); goto _err; } if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pMetaFile) < 0) { wError("vgId:%d, failed to sync file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr); - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); goto _err; } if (taosCloseFile(&pMetaFile) < 0) { wError("vgId:%d, failed to close file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr); - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); goto _err; } // rename it n = walBuildMetaName(pWal, metaVer + 1, fnameStr); if (n >= sizeof(fnameStr)) { + code = TSDB_CODE_FAILED; goto _err; } if (taosRenameFile(tmpFnameStr, fnameStr) < 0) { wError("failed to rename file due to %s. dest:%s", strerror(errno), fnameStr); - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); goto _err; } @@ -945,13 +947,14 @@ int walSaveMeta(SWal* pWal) { walBuildMetaName(pWal, metaVer, fnameStr); taosRemoveFile(fnameStr); } + taosMemoryFree(serialized); - return 0; + return code; _err: taosCloseFile(&pMetaFile); taosMemoryFree(serialized); - return -1; + return code; } int walLoadMeta(SWal* pWal) { diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 3dbaed1bc7..4845ce36dc 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -46,10 +46,11 @@ int32_t walInit() { tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); int32_t code = walCreateThread(); - if (code != 0) { + if (TSDB_CODE_SUCCESS != code) { wError("failed to init wal module since %s", tstrerror(code)); atomic_store_8(&tsWal.inited, 0); - return code; + + TAOS_RETURN(code); } wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId); @@ -171,19 +172,20 @@ _err: taosArrayDestroy(pWal->fileInfoSet); taosHashCleanup(pWal->pRefHash); taosThreadMutexDestroy(&pWal->mutex); - taosMemoryFree(pWal); - pWal = NULL; + taosMemoryFreeClear(pWal); + return NULL; } int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { - if (pWal == NULL) return TSDB_CODE_APP_ERROR; + if (pWal == NULL) TAOS_RETURN(TSDB_CODE_APP_ERROR); if (pWal->cfg.level == pCfg->level && pWal->cfg.fsyncPeriod == pCfg->fsyncPeriod && pWal->cfg.retentionPeriod == pCfg->retentionPeriod && pWal->cfg.retentionSize == pCfg->retentionSize) { wDebug("vgId:%d, walLevel:%d fsync:%d walRetentionPeriod:%d walRetentionSize:%" PRId64 " not change", pWal->cfg.vgId, pWal->cfg.level, pWal->cfg.fsyncPeriod, pWal->cfg.retentionPeriod, pWal->cfg.retentionSize); - return 0; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } wInfo("vgId:%d, change old walLevel:%d fsync:%d walRetentionPeriod:%d walRetentionSize:%" PRId64 @@ -199,14 +201,17 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { pWal->fsyncSeq = pCfg->fsyncPeriod / 1000; if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } int32_t walPersist(SWal *pWal) { + int32_t code = 0; + taosThreadMutexLock(&pWal->mutex); - int32_t ret = walSaveMeta(pWal); + code = walSaveMeta(pWal); taosThreadMutexUnlock(&pWal->mutex); - return ret; + + TAOS_RETURN(code); } void walClose(SWal *pWal) { @@ -301,14 +306,14 @@ static int32_t walCreateThread() { if (taosThreadCreate(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) { wError("failed to create wal thread since %s", strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } taosThreadAttrDestroy(&thAttr); wDebug("wal thread is launched, thread:0x%08" PRIx64, taosGetPthreadId(tsWal.thread)); - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } static void walStopThread() {