diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 94346d705e..4d5b4d977a 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -53,45 +53,63 @@ typedef struct { EWalType walLevel; // wal level } SWalCfg; -#define WAL_PREFIX "wal" -#define WAL_PREFIX_LEN 3 -#define WAL_REFRESH_MS 1000 -#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16) -#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE)) -#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) -#define WAL_FILE_LEN (WAL_PATH_LEN + 32) -#define WAL_FILE_NUM 1 // 3 +#define WAL_PREFIX "wal" +#define WAL_LOG_SUFFIX "log" +#define WAL_INDEX_SUFFIX "idx" +#define WAL_PREFIX_LEN 3 +#define WAL_REFRESH_MS 1000 +#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16) +#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFEUL)) +#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) +#define WAL_FILE_LEN (WAL_PATH_LEN + 32) +//#define WAL_FILE_NUM 1 // 3 + +#define WAL_CUR_POS_READ_ONLY 1 +#define WAL_CUR_FILE_READ_ONLY 2 typedef struct SWal { - int64_t version; - int64_t fileId; - int64_t rId; - int64_t tfd; - int32_t vgId; - int32_t keep; - int32_t level; - int32_t fsyncPeriod; + // cfg + int32_t vgId; + int32_t fsyncPeriod; // millisecond + EWalType level; + //reference + int64_t refId; + //current tfd + int64_t curLogTfd; + int64_t curIdxTfd; + //current version + int64_t curVersion; + int64_t curOffset; + //current file version + int64_t curFileFirstVersion; + int64_t curFileLastVersion; + //wal fileset version + int64_t firstVersion; + int64_t snapshotVersion; + int64_t lastVersion; + //fsync status int32_t fsyncSeq; - int8_t stop; - int8_t reseved[3]; - char path[WAL_PATH_LEN]; - char name[WAL_FILE_LEN]; + //ctl + int32_t curStatus; pthread_mutex_t mutex; + //path + char path[WAL_PATH_LEN]; } SWal; // WAL HANDLE -typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, void *pMsg); +typedef int32_t (*FWalWrite)(void *ahandle, void *pHead); // module initialization int32_t walInit(); void walCleanUp(); // handle open and ctl -SWal *walOpen(char *path, SWalCfg *pCfg); +SWal *walOpen(const char *path, SWalCfg *pCfg); +void walStop(SWal *pWal); int32_t walAlter(SWal *, SWalCfg *pCfg); void walClose(SWal *); // write -// int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen); +//int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen); int64_t walWrite(SWal *, int64_t index, void *body, int32_t bodyLen); int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize); @@ -101,7 +119,8 @@ int32_t walCommit(SWal *, int64_t ver); // truncate after int32_t walRollback(SWal *, int64_t ver); // notify that previous log can be pruned safely -int32_t walPrune(SWal *, int64_t ver); +int32_t walTakeSnapshot(SWal *, int64_t ver); +//int32_t walDataCorrupted(SWal*); // read int32_t walRead(SWal *, SWalHead **, int64_t ver); @@ -111,7 +130,6 @@ int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readN int64_t walGetFirstVer(SWal *); int64_t walGetSnapshotVer(SWal *); int64_t walGetLastVer(SWal *); -// int32_t walDataCorrupted(SWal*); //internal int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId); diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index f5f944b12b..42ede49c6b 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -22,6 +22,9 @@ extern "C" { #endif +int walRotate(SWal* pWal); +int walGetFile(SWal* pWal, int32_t version); + #ifdef __cplusplus } #endif diff --git a/source/libs/wal/src/wal.c b/source/libs/wal/src/wal.c index 05d81e0867..59f9c48814 100644 --- a/source/libs/wal/src/wal.c +++ b/source/libs/wal/src/wal.c @@ -23,11 +23,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) { return 0; } -int32_t walPrune(SWal *pWal, int64_t ver) { +int32_t walTakeSnapshot(SWal *pWal, int64_t ver) { return 0; } - int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { return 0; } diff --git a/source/libs/wal/src/walIndex.c b/source/libs/wal/src/walIndex.c new file mode 100644 index 0000000000..e1fa8c72dd --- /dev/null +++ b/source/libs/wal/src/walIndex.c @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "tref.h" +#include "tfile.h" +#include "walInt.h" + +int walSetCurVerImpl(SWal *pWal, int64_t ver) { + //close old file + //iterate all files + //open right file + //set cur version, cur file version and cur status + return 0; +} + +int walSetCurVer(SWal *pWal, int64_t ver) { + if(ver > pWal->lastVersion + 1) { + //TODO: some records are skipped + return -1; + } + if(ver < pWal->firstVersion) { + //TODO: try to seek pruned log + return -1; + } + if(ver < pWal->snapshotVersion) { + //TODO: seek snapshotted log + } + if(ver < pWal->curFileFirstVersion || (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) { + //back up to avoid inconsistency + int64_t curVersion = pWal->curVersion; + int64_t curOffset = pWal->curOffset; + int64_t curFileFirstVersion = pWal->curFileFirstVersion; + int64_t curFileLastVersion = pWal->curFileLastVersion; + if(walSetCurVerImpl(pWal, ver) < 0) { + //TODO: errno + pWal->curVersion = curVersion; + pWal->curOffset = curOffset; + pWal->curFileFirstVersion = curFileFirstVersion; + pWal->curFileLastVersion = curFileLastVersion; + return -1; + } + } + + return 0; +} + +int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { + int code = 0; + //get index file + if(!tfValid(pWal->curIdxTfd)) { + code = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + } + if(pWal->curVersion != ver) { + if(walSetCurVer(pWal, ver) != 0) { + //TODO: some records are skipped + return -1; + } + } + //check file checksum + //append index + return 0; +} + +int walRotateIndex(SWal *pWal) { + //check file checksum + //create new file + //switch file + return 0; +} diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 2bc12b374c..4168c21a6e 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -21,7 +21,7 @@ #include "walInt.h" typedef struct { - int32_t refId; + int32_t refSetId; int32_t seq; int8_t stop; pthread_t thread; @@ -36,7 +36,7 @@ static void walFreeObj(void *pWal); int32_t walInit() { int32_t code = 0; - tsWal.refId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); + tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); code = pthread_mutex_init(&tsWal.mutex, NULL); if (code) { @@ -45,23 +45,23 @@ int32_t walInit() { } code = walCreateThread(); - if (code != TSDB_CODE_SUCCESS) { + if (code != 0) { wError("failed to init wal module since %s", tstrerror(code)); return code; } - wInfo("wal module is initialized, rsetId:%d", tsWal.refId); + wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId); return code; } void walCleanUp() { walStopThread(); - taosCloseRef(tsWal.refId); + taosCloseRef(tsWal.refSetId); pthread_mutex_destroy(&tsWal.mutex); wInfo("wal module is cleaned up"); } -SWal *walOpen(char *path, SWalCfg *pCfg) { +SWal *walOpen(const char *path, SWalCfg *pCfg) { SWal *pWal = malloc(sizeof(SWal)); if (pWal == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -69,10 +69,9 @@ SWal *walOpen(char *path, SWalCfg *pCfg) { } pWal->vgId = pCfg->vgId; - pWal->tfd = -1; - pWal->fileId = -1; + pWal->curLogTfd = -1; + /*pWal->curFileId = -1;*/ pWal->level = pCfg->walLevel; - /*pWal->keep = pCfg->keep;*/ pWal->fsyncPeriod = pCfg->fsyncPeriod; tstrncpy(pWal->path, path, sizeof(pWal->path)); pthread_mutex_init(&pWal->mutex, NULL); @@ -80,13 +79,13 @@ SWal *walOpen(char *path, SWalCfg *pCfg) { pWal->fsyncSeq = pCfg->fsyncPeriod / 1000; if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; - if (walInitObj(pWal) != TSDB_CODE_SUCCESS) { + if (walInitObj(pWal) != 0) { walFreeObj(pWal); return NULL; } - pWal->rId = taosAddRef(tsWal.refId, pWal); - if (pWal->rId < 0) { + pWal->refId = taosAddRef(tsWal.refSetId, pWal); + if (pWal->refId < 0) { walFreeObj(pWal); return NULL; } @@ -102,7 +101,7 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) { wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->vgId, pWal->level, pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod); - return TSDB_CODE_SUCCESS; + return 0; } wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->vgId, pWal->level, @@ -113,26 +112,16 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { pWal->fsyncSeq = pCfg->fsyncPeriod / 1000; if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; - return TSDB_CODE_SUCCESS; -} - -void walStop(void *handle) { - if (handle == NULL) return; - SWal *pWal = handle; - - pthread_mutex_lock(&pWal->mutex); - pWal->stop = 1; - pthread_mutex_unlock(&pWal->mutex); - wDebug("vgId:%d, stop write wal", pWal->vgId); + return 0; } void walClose(SWal *pWal) { if (pWal == NULL) return; pthread_mutex_lock(&pWal->mutex); - tfClose(pWal->tfd); + tfClose(pWal->curLogTfd); pthread_mutex_unlock(&pWal->mutex); - taosRemoveRef(tsWal.refId, pWal->rId); + taosRemoveRef(tsWal.refSetId, pWal->refId); } static int32_t walInitObj(SWal *pWal) { @@ -142,14 +131,14 @@ static int32_t walInitObj(SWal *pWal) { } wDebug("vgId:%d, object is initialized", pWal->vgId); - return TSDB_CODE_SUCCESS; + return 0; } static void walFreeObj(void *wal) { SWal *pWal = wal; wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); - tfClose(pWal->tfd); + tfClose(pWal->curLogTfd); pthread_mutex_destroy(&pWal->mutex); tfree(pWal); } @@ -174,16 +163,16 @@ static void walUpdateSeq() { } static void walFsyncAll() { - SWal *pWal = taosIterateRef(tsWal.refId, 0); + SWal *pWal = taosIterateRef(tsWal.refSetId, 0); while (pWal) { if (walNeedFsync(pWal)) { wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); - int32_t code = tfFsync(pWal->tfd); + int32_t code = tfFsync(pWal->curLogTfd); if (code != 0) { - wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code)); + wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(code)); } } - pWal = taosIterateRef(tsWal.refId, pWal->rId); + pWal = taosIterateRef(tsWal.refSetId, pWal->refId); } } @@ -216,7 +205,7 @@ static int32_t walCreateThread() { pthread_attr_destroy(&thAttr); wDebug("wal thread is launched, thread:0x%08" PRIx64, taosGetPthreadId(tsWal.thread)); - return TSDB_CODE_SUCCESS; + return 0; } static void walStopThread() { diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 023b1c4a48..a8123f9c25 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -21,6 +21,7 @@ #include "tfile.h" #include "walInt.h" +#if 0 static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId); int32_t walRenew(void *handle) { @@ -29,16 +30,16 @@ int32_t walRenew(void *handle) { SWal * pWal = handle; int32_t code = 0; - if (pWal->stop) { - wDebug("vgId:%d, do not create a new wal file", pWal->vgId); - return 0; - } + /*if (pWal->stop) {*/ + /*wDebug("vgId:%d, do not create a new wal file", pWal->vgId);*/ + /*return 0;*/ + /*}*/ pthread_mutex_lock(&pWal->mutex); - if (tfValid(pWal->tfd)) { - tfClose(pWal->tfd); - wDebug("vgId:%d, file:%s, it is closed while renew", pWal->vgId, pWal->name); + if (tfValid(pWal->logTfd)) { + tfClose(pWal->logTfd); + wDebug("vgId:%d, file:%s, it is closed while renew", pWal->vgId, pWal->logName); } /*if (pWal->keep == TAOS_WAL_KEEP) {*/ @@ -48,14 +49,14 @@ int32_t walRenew(void *handle) { /*pWal->fileId++;*/ /*}*/ - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); - pWal->tfd = tfOpenCreateWrite(pWal->name); + snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId); + pWal->logTfd = tfOpenCreateWrite(pWal->logName); - if (!tfValid(pWal->tfd)) { + if (!tfValid(pWal->logTfd)) { code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); + wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno)); } else { - wDebug("vgId:%d, file:%s, it is created and open while renew", pWal->vgId, pWal->name); + wDebug("vgId:%d, file:%s, it is created and open while renew", pWal->vgId, pWal->logName); } pthread_mutex_unlock(&pWal->mutex); @@ -67,13 +68,13 @@ void walRemoveOneOldFile(void *handle) { SWal *pWal = handle; if (pWal == NULL) return; /*if (pWal->keep == TAOS_WAL_KEEP) return;*/ - if (!tfValid(pWal->tfd)) return; + if (!tfValid(pWal->logTfd)) return; pthread_mutex_lock(&pWal->mutex); // remove the oldest wal file int64_t oldFileId = -1; - if (walGetOldFile(pWal, pWal->fileId, WAL_FILE_NUM, &oldFileId) == 0) { + if (walGetOldFile(pWal, pWal->curFileId, WAL_FILE_NUM, &oldFileId) == 0) { char walName[WAL_FILE_LEN] = {0}; snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId); @@ -95,26 +96,24 @@ void walRemoveAllOldFiles(void *handle) { pthread_mutex_lock(&pWal->mutex); - tfClose(pWal->tfd); - wDebug("vgId:%d, file:%s, it is closed before remove all wals", pWal->vgId, pWal->name); + tfClose(pWal->logTfd); + wDebug("vgId:%d, file:%s, it is closed before remove all wals", pWal->vgId, pWal->logName); while (walGetNextFile(pWal, &fileId) >= 0) { - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); + snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); - if (remove(pWal->name) < 0) { - wError("vgId:%d, wal:%p file:%s, failed to remove since %s", pWal->vgId, pWal, pWal->name, strerror(errno)); + if (remove(pWal->logName) < 0) { + wError("vgId:%d, wal:%p file:%s, failed to remove since %s", pWal->vgId, pWal, pWal->logName, strerror(errno)); } else { - wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name); + wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->logName); } } pthread_mutex_unlock(&pWal->mutex); } - -#if defined(WAL_CHECKSUM_WHOLE) +#endif static void walUpdateChecksum(SWalHead *pHead) { pHead->sver = 2; - pHead->cksum = 0; pHead->cksum = taosCalcChecksum(0, (uint8_t *)pHead, sizeof(SWalHead) + pHead->len); } @@ -130,8 +129,6 @@ static int walValidateChecksum(SWalHead *pHead) { return 0; } -#endif - int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) { if (pWal == NULL) return -1; @@ -143,32 +140,27 @@ int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) { int32_t code = 0; // no wal - if (!tfValid(pWal->tfd)) return 0; + if (!tfValid(pWal->curLogTfd)) return 0; if (pWal->level == TAOS_WAL_NOLOG) return 0; - if (pHead->version <= pWal->version) return 0; + if (pHead->version <= pWal->curVersion) return 0; pHead->signature = WAL_SIGNATURE; pHead->len = bodyLen; memcpy(pHead->cont, body, bodyLen); -#if defined(WAL_CHECKSUM_WHOLE) walUpdateChecksum(pHead); -#else - pHead->sver = 0; - taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); -#endif int32_t contLen = pHead->len + sizeof(SWalHead); pthread_mutex_lock(&pWal->mutex); - if (tfWrite(pWal->tfd, pHead, contLen) != contLen) { + if (tfWrite(pWal->curLogTfd, pHead, contLen) != contLen) { code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno)); + wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); } else { - wTrace("vgId:%d, write wal, fileId:%" PRId64 " tfd:%" PRId64 " hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId, - pWal->fileId, pWal->tfd, pHead->version, pWal->version, pHead->len); - pWal->version = pHead->version; + /*wTrace("vgId:%d, write wal, fileId:%" PRId64 " tfd:%" PRId64 " hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId,*/ + /*pWal->curFileId, pWal->logTfd, pHead->version, pWal->curVersion, pHead->len);*/ + pWal->curVersion = pHead->version; } pthread_mutex_unlock(&pWal->mutex); @@ -179,16 +171,17 @@ int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) { } void walFsync(SWal *pWal, bool forceFsync) { - if (pWal == NULL || !tfValid(pWal->tfd)) return; + if (pWal == NULL || !tfValid(pWal->curLogTfd)) return; if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { - wTrace("vgId:%d, fileId:%" PRId64 ", do fsync", pWal->vgId, pWal->fileId); - if (tfFsync(pWal->tfd) < 0) { - wError("vgId:%d, fileId:%" PRId64 ", fsync failed since %s", pWal->vgId, pWal->fileId, strerror(errno)); + wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, pWal->curFileFirstVersion); + if (tfFsync(pWal->curLogTfd) < 0) { + wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); } } } +#if 0 int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) { if (handle == NULL) return -1; @@ -198,10 +191,10 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) { int64_t fileId = -1; while ((code = walGetNextFile(pWal, &fileId)) >= 0) { - if (fileId == pWal->fileId) continue; + /*if (fileId == pWal->curFileId) continue;*/ char walName[WAL_FILE_LEN]; - snprintf(walName, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); + snprintf(walName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); wInfo("vgId:%d, file:%s, will be restored", pWal->vgId, walName); code = walRestoreWalFile(pWal, pVnode, writeFp, walName, fileId); @@ -210,7 +203,7 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) { continue; } - wInfo("vgId:%d, file:%s, restore success, wver:%" PRIu64, pWal->vgId, walName, pWal->version); + wInfo("vgId:%d, file:%s, restore success, wver:%" PRIu64, pWal->vgId, walName, pWal->curVersion); count++; } @@ -222,14 +215,14 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) { return walRenew(pWal); } else { // open the existing WAL file in append mode - pWal->fileId = 0; - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); - pWal->tfd = tfOpenCreateWriteAppend(pWal->name); - if (!tfValid(pWal->tfd)) { - wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); + /*pWal->curFileId = 0;*/ + snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId); + pWal->logTfd = tfOpenCreateWriteAppend(pWal->logName); + if (!tfValid(pWal->logTfd)) { + wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } - wDebug("vgId:%d, file:%s, it is created and open while restore", pWal->vgId, pWal->name); + wDebug("vgId:%d, file:%s, it is created and open while restore", pWal->vgId, pWal->logName); } return TSDB_CODE_SUCCESS; @@ -246,14 +239,15 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { int32_t code = walGetNextFile(pWal, fileId); if (code >= 0) { sprintf(fileName, "wal/%s%" PRId64, WAL_PREFIX, *fileId); - code = (*fileId == pWal->fileId) ? 0 : 1; + /*code = (*fileId == pWal->curFileId) ? 0 : 1;*/ } - wDebug("vgId:%d, get wal file, code:%d curId:%" PRId64 " outId:%" PRId64, pWal->vgId, code, pWal->fileId, *fileId); + wDebug("vgId:%d, get wal file, code:%d curId:%" PRId64 " outId:%" PRId64, pWal->vgId, code, pWal->curFileId, *fileId); pthread_mutex_unlock(&(pWal->mutex)); return code; } +#endif static void walFtruncate(SWal *pWal, int64_t tfd, int64_t offset) { tfFtruncate(tfd, offset); @@ -279,13 +273,6 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, continue; } -#if defined(WAL_CHECKSUM_WHOLE) - if (pHead->sver == 0 && walValidateChecksum(pHead)) { - wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos); - *offset = pos; - return TSDB_CODE_SUCCESS; - } - if (pHead->sver >= 1) { if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) { wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); @@ -298,15 +285,6 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, return TSDB_CODE_SUCCESS; } } - -#else - if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { - wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos); - *offset = pos; - return TSDB_CODE_SUCCESS; - } - -#endif } return TSDB_CODE_WAL_FILE_CORRUPTED; @@ -349,7 +327,6 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch break; } -#if defined(WAL_CHECKSUM_WHOLE) if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 2) { wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, pHead->version, pHead->len, offset); @@ -393,50 +370,15 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch } } -#else - if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { - wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, - pHead->version, pHead->len, offset); - code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); - if (code != TSDB_CODE_SUCCESS) { - walFtruncate(pWal, tfd, offset); - break; - } - } - - if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) { - wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, - pHead->version, pHead->len, offset); - code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); - if (code != TSDB_CODE_SUCCESS) { - walFtruncate(pWal, tfd, offset); - break; - } - } - - ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len); - if (ret < 0) { - wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno)); - code = TAOS_SYSTEM_ERROR(errno); - break; - } - - if (ret < pHead->len) { - wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len); - offset += sizeof(SWalHead); - continue; - } - -#endif offset = offset + sizeof(SWalHead) + pHead->len; wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64, - pWal->vgId, fileId, pHead->version, pWal->version, pHead->len, offset); + pWal->vgId, fileId, pHead->version, pWal->curVersion, pHead->len, offset); - pWal->version = pHead->version; + pWal->curVersion = pHead->version; // wInfo("writeFp: %ld", offset); - (*writeFp)(pVnode, pHead, NULL); + (*writeFp)(pVnode, pHead); } tfClose(tfd); @@ -449,7 +391,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch uint64_t walGetVersion(SWal *pWal) { if (pWal == NULL) return 0; - return pWal->version; + return pWal->curVersion; } // Wal version in slave (dnode1) must be reset. @@ -459,7 +401,7 @@ uint64_t walGetVersion(SWal *pWal) { void walResetVersion(SWal *pWal, uint64_t newVer) { if (pWal == NULL) return; - wInfo("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->version, newVer); + wInfo("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->curVersion, newVer); - pWal->version = newVer; + pWal->curVersion = newVer; }