use new macros for wal meta
This commit is contained in:
parent
ecc4b50a28
commit
ac8e072275
|
@ -153,17 +153,17 @@ static inline void walResetVer(SWalVer* pVer) {
|
|||
pVer->lastVer = -1;
|
||||
}
|
||||
|
||||
int walLoadMeta(SWal* pWal);
|
||||
int walSaveMeta(SWal* pWal);
|
||||
int walRemoveMeta(SWal* pWal);
|
||||
int walRollFileInfo(SWal* pWal);
|
||||
int32_t walLoadMeta(SWal* pWal);
|
||||
int32_t walSaveMeta(SWal* pWal);
|
||||
int32_t walRemoveMeta(SWal* pWal);
|
||||
int32_t walRollFileInfo(SWal* pWal);
|
||||
|
||||
int walCheckAndRepairMeta(SWal* pWal);
|
||||
int32_t walCheckAndRepairMeta(SWal* pWal);
|
||||
|
||||
int walCheckAndRepairIdx(SWal* pWal);
|
||||
int32_t walCheckAndRepairIdx(SWal* pWal);
|
||||
|
||||
char* walMetaSerialize(SWal* pWal);
|
||||
int walMetaDeserialize(SWal* pWal, const char* bytes);
|
||||
int32_t walMetaSerialize(SWal* pWal, char** serialized);
|
||||
int32_t walMetaDeserialize(SWal* pWal, const char* bytes);
|
||||
// meta section end
|
||||
|
||||
int64_t walGetSeq();
|
||||
|
|
|
@ -46,9 +46,13 @@ static FORCE_INLINE int walBuildTmpMetaName(SWal* pWal, char* buf) {
|
|||
return sprintf(buf, "%s/meta-ver.tmp", pWal->path);
|
||||
}
|
||||
|
||||
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
|
||||
static FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t* lastVer) {
|
||||
int32_t code = 0, lino = 0;
|
||||
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
int64_t retVer = -1;
|
||||
void* ptr = NULL;
|
||||
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
|
||||
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
|
||||
|
||||
|
@ -58,8 +62,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
|
|||
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
|
||||
if (pFile == NULL) {
|
||||
wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
*lastVer = retVer;
|
||||
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||
}
|
||||
|
||||
// ensure size as non-negative
|
||||
|
@ -73,7 +77,6 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
|
|||
int64_t readSize = 0;
|
||||
char* buf = NULL;
|
||||
int64_t offset = TMIN(pFileInfo->fileSize, fileSize);
|
||||
int64_t retVer = -1;
|
||||
int64_t lastEntryBeginOffset = 0;
|
||||
int64_t lastEntryEndOffset = 0;
|
||||
int64_t recordLen = 0;
|
||||
|
@ -94,25 +97,22 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
|
|||
readSize = end - offset;
|
||||
capacity = readSize + sizeof(magic);
|
||||
|
||||
void* ptr = taosMemoryRealloc(buf, capacity);
|
||||
ptr = taosMemoryRealloc(buf, capacity);
|
||||
if (ptr == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
|
||||
}
|
||||
buf = ptr;
|
||||
|
||||
int64_t ret = taosLSeekFile(pFile, offset, SEEK_SET);
|
||||
if (ret < 0) {
|
||||
wError("vgId:%d, failed to lseek file due to %s. offset:%" PRId64 "", pWal->cfg.vgId, strerror(errno), offset);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
|
||||
}
|
||||
|
||||
if (readSize != taosReadFile(pFile, buf, readSize)) {
|
||||
wError("vgId:%d, failed to read file due to %s. readSize:%" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno),
|
||||
readSize, fnameStr);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
|
||||
}
|
||||
|
||||
char* candidate = NULL;
|
||||
|
@ -163,8 +163,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
|
|||
capacity += extraSize;
|
||||
void* ptr = taosMemoryRealloc(buf, capacity);
|
||||
if (ptr == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
|
||||
}
|
||||
buf = ptr;
|
||||
}
|
||||
|
@ -211,7 +210,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
|
|||
}
|
||||
|
||||
if (retVer < 0) {
|
||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||
code = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||
}
|
||||
|
||||
// truncate file
|
||||
|
@ -221,30 +220,26 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
|
|||
|
||||
if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) {
|
||||
wError("failed to truncate file due to %s. file:%s", strerror(errno), fnameStr);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
|
||||
}
|
||||
|
||||
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pFile) < 0) {
|
||||
wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
|
||||
}
|
||||
}
|
||||
|
||||
pFileInfo->fileSize = lastEntryEndOffset;
|
||||
|
||||
taosCloseFile(&pFile);
|
||||
taosMemoryFree(buf);
|
||||
return retVer;
|
||||
|
||||
_err:
|
||||
taosCloseFile(&pFile);
|
||||
taosMemoryFree(buf);
|
||||
return -1;
|
||||
*lastVer = retVer;
|
||||
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static void walRebuildFileInfoSet(SArray* metaLogList, SArray* actualLogList) {
|
||||
static int32_t walRebuildFileInfoSet(SArray* metaLogList, SArray* actualLogList) {
|
||||
int metaFileNum = taosArrayGetSize(metaLogList);
|
||||
int actualFileNum = taosArrayGetSize(actualLogList);
|
||||
int j = 0;
|
||||
|
@ -270,10 +265,14 @@ static void walRebuildFileInfoSet(SArray* metaLogList, SArray* actualLogList) {
|
|||
|
||||
for (int i = 0; i < actualFileNum; i++) {
|
||||
SWalFileInfo* pFileInfo = taosArrayGet(actualLogList, i);
|
||||
taosArrayPush(metaLogList, pFileInfo);
|
||||
if (NULL == taosArrayPush(metaLogList, pFileInfo)) {
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void walAlignVersions(SWal* pWal) {
|
||||
if (pWal->vers.firstVer > pWal->vers.snapshotVer + 1) {
|
||||
wWarn("vgId:%d, firstVer:%" PRId64 " is larger than snapshotVer:%" PRId64 " + 1. align with it.", pWal->cfg.vgId,
|
||||
|
@ -294,7 +293,7 @@ static void walAlignVersions(SWal* pWal) {
|
|||
wInfo("vgId:%d, reset commitVer to %" PRId64, pWal->cfg.vgId, pWal->vers.commitVer);
|
||||
}
|
||||
|
||||
static int walRepairLogFileTs(SWal* pWal, bool* updateMeta) {
|
||||
static int32_t walRepairLogFileTs(SWal* pWal, bool* updateMeta) {
|
||||
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
int32_t fileIdx = -1;
|
||||
int32_t lastCloseTs = 0;
|
||||
|
@ -310,9 +309,9 @@ static int walRepairLogFileTs(SWal* pWal, bool* updateMeta) {
|
|||
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
|
||||
int32_t mtime = 0;
|
||||
if (taosStatFile(fnameStr, NULL, &mtime, NULL) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, failed to stat file due to %s, file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
|
||||
return -1;
|
||||
|
||||
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||
}
|
||||
|
||||
if (updateMeta != NULL) *updateMeta = true;
|
||||
|
@ -321,10 +320,10 @@ static int walRepairLogFileTs(SWal* pWal, bool* updateMeta) {
|
|||
lastCloseTs = pFileInfo->closeTs;
|
||||
}
|
||||
|
||||
return 0;
|
||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
static bool walLogEntriesComplete(const SWal* pWal) {
|
||||
static int32_t walLogEntriesComplete(const SWal* pWal) {
|
||||
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
bool complete = true;
|
||||
int32_t fileIdx = -1;
|
||||
|
@ -346,13 +345,13 @@ static bool walLogEntriesComplete(const SWal* pWal) {
|
|||
wError("vgId:%d, WAL log entries incomplete in range [%" PRId64 ", %" PRId64 "], index:%" PRId64
|
||||
", snaphotVer:%" PRId64,
|
||||
pWal->cfg.vgId, pWal->vers.firstVer, pWal->vers.lastVer, index, pWal->vers.snapshotVer);
|
||||
terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
|
||||
TAOS_RETURN(TSDB_CODE_WAL_LOG_INCOMPLETE);
|
||||
} else {
|
||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
}
|
||||
|
||||
return complete;
|
||||
}
|
||||
|
||||
static int walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
|
||||
static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
|
||||
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
|
||||
ASSERT(pFileInfo != NULL);
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
|
@ -364,13 +363,12 @@ static int walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
|
|||
int64_t lastEndOffset = records * sizeof(SWalIdxEntry);
|
||||
|
||||
if (fileSize <= lastEndOffset) {
|
||||
return 0;
|
||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
|
||||
if (pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||
}
|
||||
|
||||
wInfo("vgId:%d, trim idx file. file: %s, size: %" PRId64 ", offset: %" PRId64, pWal->cfg.vgId, fnameStr, fileSize,
|
||||
|
@ -378,11 +376,13 @@ static int walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
|
|||
|
||||
taosFtruncateFile(pFile, lastEndOffset);
|
||||
taosCloseFile(&pFile);
|
||||
return 0;
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
int walCheckAndRepairMeta(SWal* pWal) {
|
||||
int32_t walCheckAndRepairMeta(SWal* pWal) {
|
||||
// load log files, get first/snapshot/last version info
|
||||
int32_t code = 0;
|
||||
const char* logPattern = "^[0-9]+.log$";
|
||||
const char* idxPattern = "^[0-9]+.idx$";
|
||||
regex_t logRegPattern;
|
||||
|
@ -396,7 +396,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
|||
regfree(&logRegPattern);
|
||||
regfree(&idxRegPattern);
|
||||
wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
|
||||
return -1;
|
||||
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||
}
|
||||
|
||||
SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo));
|
||||
|
@ -428,8 +428,11 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
|||
bool updateMeta = (metaFileNum != actualFileNum);
|
||||
|
||||
// rebuild meta of file info
|
||||
walRebuildFileInfoSet(pWal->fileInfoSet, actualLog);
|
||||
code = walRebuildFileInfoSet(pWal->fileInfoSet, actualLog);
|
||||
if (code) {
|
||||
taosArrayDestroy(actualLog);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
|
||||
|
@ -444,9 +447,9 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
|||
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
|
||||
int32_t code = taosStatFile(fnameStr, &fileSize, NULL, NULL);
|
||||
if (code < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("failed to stat file since %s. file:%s", terrstr(), fnameStr);
|
||||
return -1;
|
||||
|
||||
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||
}
|
||||
|
||||
if (pFileInfo->lastVer >= pFileInfo->firstVer && fileSize == pFileInfo->fileSize) {
|
||||
|
@ -455,16 +458,20 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
|||
}
|
||||
updateMeta = true;
|
||||
|
||||
(void)walTrimIdxFile(pWal, fileIdx);
|
||||
TAOS_CHECK_RETURN(walTrimIdxFile(pWal, fileIdx));
|
||||
|
||||
int64_t lastVer = walScanLogGetLastVer(pWal, fileIdx);
|
||||
int64_t lastVer = -1;
|
||||
code = walScanLogGetLastVer(pWal, fileIdx, &lastVer);
|
||||
if (lastVer < 0) {
|
||||
if (terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
||||
if (code != TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
||||
wError("failed to scan wal last ver since %s", terrstr());
|
||||
return -1;
|
||||
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
// empty log file
|
||||
lastVer = pFileInfo->firstVer - 1;
|
||||
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// update lastVer
|
||||
|
@ -484,42 +491,36 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
|||
walAlignVersions(pWal);
|
||||
|
||||
// repair ts of files
|
||||
if (walRepairLogFileTs(pWal, &updateMeta) < 0) {
|
||||
return -1;
|
||||
}
|
||||
TAOS_CHECK_RETURN(walRepairLogFileTs(pWal, &updateMeta));
|
||||
|
||||
// update meta file
|
||||
if (updateMeta) {
|
||||
(void)walSaveMeta(pWal);
|
||||
TAOS_CHECK_RETURN(walSaveMeta(pWal));
|
||||
}
|
||||
|
||||
if (!walLogEntriesComplete(pWal)) {
|
||||
return -1;
|
||||
TAOS_CHECK_RETURN(walLogEntriesComplete(pWal));
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) {
|
||||
static int32_t walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) {
|
||||
if (taosLSeekFile(pLogFile, offset, SEEK_SET) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||
}
|
||||
|
||||
if (taosReadFile(pLogFile, pCkHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||
}
|
||||
|
||||
if (walValidHeadCksum(pCkHead) != 0) {
|
||||
terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH;
|
||||
return -1;
|
||||
TAOS_RETURN(TSDB_CODE_WAL_CHKSUM_MISMATCH);
|
||||
}
|
||||
|
||||
return 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
|
||||
static int32_t walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
|
||||
int32_t code = 0, lino = 0;
|
||||
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
|
@ -530,12 +531,12 @@ static int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
|
|||
|
||||
if (taosStatFile(fnameStr, &fileSize, NULL, NULL) < 0 && errno != ENOENT) {
|
||||
wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
|
||||
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||
}
|
||||
|
||||
if (fileSize == (pFileInfo->lastVer - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)) {
|
||||
return 0;
|
||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
// start to repair
|
||||
|
@ -550,15 +551,15 @@ static int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
|
|||
pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE);
|
||||
if (pIdxFile == NULL) {
|
||||
wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
|
||||
}
|
||||
|
||||
pLogFile = taosOpenFile(fLogNameStr, TD_FILE_READ);
|
||||
if (pLogFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fLogNameStr, terrstr());
|
||||
goto _err;
|
||||
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
|
||||
}
|
||||
|
||||
// determine the last valid entry end, i.e. offset
|
||||
|
@ -566,15 +567,15 @@ static int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
|
|||
if (taosLSeekFile(pIdxFile, offset, SEEK_SET) < 0) {
|
||||
wError("vgId:%d, failed to seek file due to %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno),
|
||||
offset, fnameStr);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
|
||||
}
|
||||
|
||||
if (taosReadFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
|
||||
wError("vgId:%d, failed to read file due to %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno),
|
||||
offset, fnameStr);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
|
||||
}
|
||||
|
||||
if (idxEntry.ver > pFileInfo->lastVer) {
|
||||
|
@ -602,19 +603,19 @@ static int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
|
|||
// ftruncate idx file
|
||||
if (offset < fileSize) {
|
||||
if (taosFtruncateFile(pIdxFile, offset) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, failed to ftruncate file since %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, terrstr(),
|
||||
offset, fnameStr);
|
||||
goto _err;
|
||||
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
|
||||
}
|
||||
}
|
||||
|
||||
// rebuild idx file
|
||||
if (taosLSeekFile(pIdxFile, 0, SEEK_END) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, failed to seek file since %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, terrstr(), offset,
|
||||
fnameStr);
|
||||
goto _err;
|
||||
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
|
||||
}
|
||||
|
||||
int64_t count = 0;
|
||||
|
@ -630,23 +631,25 @@ static int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
|
|||
}
|
||||
idxEntry.offset += sizeof(SWalCkHead) + cryptedBodyLen;
|
||||
|
||||
if (walReadLogHead(pLogFile, idxEntry.offset, &ckHead) < 0) {
|
||||
code = walReadLogHead(pLogFile, idxEntry.offset, &ckHead);
|
||||
if (code) {
|
||||
wError("vgId:%d, failed to read wal log head since %s. index:%" PRId64 ", offset:%" PRId64 ", file:%s",
|
||||
pWal->cfg.vgId, terrstr(), idxEntry.ver, idxEntry.offset, fLogNameStr);
|
||||
goto _err;
|
||||
|
||||
TAOS_CHECK_GOTO(code, &lino, _err);
|
||||
}
|
||||
if (pWal->cfg.level != TAOS_WAL_SKIP && taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, failed to append file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
|
||||
goto _err;
|
||||
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
|
||||
}
|
||||
count++;
|
||||
}
|
||||
|
||||
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pIdxFile) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, faild to fsync file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
|
||||
goto _err;
|
||||
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
|
||||
}
|
||||
|
||||
if (count > 0) {
|
||||
|
@ -654,14 +657,15 @@ static int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
|
|||
pFileInfo->lastVer);
|
||||
}
|
||||
|
||||
(void)taosCloseFile(&pLogFile);
|
||||
(void)taosCloseFile(&pIdxFile);
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
if (code) {
|
||||
wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
|
||||
}
|
||||
|
||||
(void)taosCloseFile(&pLogFile);
|
||||
(void)taosCloseFile(&pIdxFile);
|
||||
return -1;
|
||||
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int64_t walGetVerRetention(SWal* pWal, int64_t bytes) {
|
||||
|
@ -681,19 +685,24 @@ int64_t walGetVerRetention(SWal* pWal, int64_t bytes) {
|
|||
return ver + 1;
|
||||
}
|
||||
|
||||
int walCheckAndRepairIdx(SWal* pWal) {
|
||||
int32_t walCheckAndRepairIdx(SWal* pWal) {
|
||||
int32_t code = 0;
|
||||
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
int32_t fileIdx = sz;
|
||||
|
||||
while (--fileIdx >= 0) {
|
||||
if (walCheckAndRepairIdxFile(pWal, fileIdx) < 0) {
|
||||
code = walCheckAndRepairIdxFile(pWal, fileIdx);
|
||||
if (code) {
|
||||
wError("vgId:%d, failed to repair idx file since %s. fileIdx:%d", pWal->cfg.vgId, terrstr(), fileIdx);
|
||||
return -1;
|
||||
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int walRollFileInfo(SWal* pWal) {
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t walRollFileInfo(SWal* pWal) {
|
||||
int64_t ts = taosGetTimestampSec();
|
||||
|
||||
SArray* pArray = pWal->fileInfoSet;
|
||||
|
@ -717,10 +726,11 @@ int walRollFileInfo(SWal* pWal) {
|
|||
pNewInfo->syncedOffset = 0;
|
||||
taosArrayPush(pArray, pNewInfo);
|
||||
taosMemoryFree(pNewInfo);
|
||||
return 0;
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
char* walMetaSerialize(SWal* pWal) {
|
||||
int32_t walMetaSerialize(SWal* pWal, char** serialized) {
|
||||
char buf[30];
|
||||
int sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
|
@ -737,8 +747,8 @@ char* walMetaSerialize(SWal* pWal) {
|
|||
if (pFiles) {
|
||||
cJSON_Delete(pFiles);
|
||||
}
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
cJSON_AddItemToObject(pRoot, "meta", pMeta);
|
||||
sprintf(buf, "%" PRId64, pWal->vers.firstVer);
|
||||
|
@ -757,7 +767,8 @@ char* walMetaSerialize(SWal* pWal) {
|
|||
cJSON_AddItemToArray(pFiles, pField = cJSON_CreateObject());
|
||||
if (pField == NULL) {
|
||||
cJSON_Delete(pRoot);
|
||||
return NULL;
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
// cjson only support int32_t or double
|
||||
// string are used to prohibit the loss of precision
|
||||
|
@ -772,12 +783,15 @@ char* walMetaSerialize(SWal* pWal) {
|
|||
sprintf(buf, "%" PRId64, pInfo->fileSize);
|
||||
cJSON_AddStringToObject(pField, "fileSize", buf);
|
||||
}
|
||||
char* serialized = cJSON_Print(pRoot);
|
||||
char* pSerialized = cJSON_Print(pRoot);
|
||||
cJSON_Delete(pRoot);
|
||||
return serialized;
|
||||
|
||||
*serialized = pSerialized;
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
int walMetaDeserialize(SWal* pWal, const char* bytes) {
|
||||
int32_t walMetaDeserialize(SWal* pWal, const char* bytes) {
|
||||
/*A(taosArrayGetSize(pWal->fileInfoSet) == 0);*/
|
||||
cJSON *pRoot, *pMeta, *pFiles, *pInfoJson, *pField;
|
||||
pRoot = cJSON_Parse(bytes);
|
||||
|
@ -829,11 +843,11 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
|
|||
pWal->fileInfoSet = pArray;
|
||||
pWal->writeCur = sz - 1;
|
||||
cJSON_Delete(pRoot);
|
||||
return 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_err:
|
||||
cJSON_Delete(pRoot);
|
||||
return -1;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static int walFindCurMetaVer(SWal* pWal) {
|
||||
|
@ -872,8 +886,8 @@ static void walUpdateSyncedOffset(SWal* pWal) {
|
|||
pFileInfo->syncedOffset = pFileInfo->fileSize;
|
||||
}
|
||||
|
||||
int walSaveMeta(SWal* pWal) {
|
||||
int code = 0;
|
||||
int32_t walSaveMeta(SWal* pWal) {
|
||||
int code = 0, lino = 0;
|
||||
int metaVer = walFindCurMetaVer(pWal);
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
char tmpFnameStr[WAL_FILE_LEN];
|
||||
|
@ -909,37 +923,37 @@ int walSaveMeta(SWal* pWal) {
|
|||
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||
}
|
||||
|
||||
char* serialized = walMetaSerialize(pWal);
|
||||
char* serialized = NULL;
|
||||
TAOS_CHECK_RETURN(walMetaSerialize(pWal, &serialized));
|
||||
int len = strlen(serialized);
|
||||
if (pWal->cfg.level != TAOS_WAL_SKIP && len != taosWriteFile(pMetaFile, serialized, len)) {
|
||||
wError("vgId:%d, failed to write file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
|
||||
}
|
||||
|
||||
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pMetaFile) < 0) {
|
||||
wError("vgId:%d, failed to sync file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
|
||||
}
|
||||
|
||||
if (taosCloseFile(&pMetaFile) < 0) {
|
||||
wError("vgId:%d, failed to close file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
|
||||
}
|
||||
|
||||
// rename it
|
||||
n = walBuildMetaName(pWal, metaVer + 1, fnameStr);
|
||||
if (n >= sizeof(fnameStr)) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
goto _err;
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _err);
|
||||
}
|
||||
|
||||
if (taosRenameFile(tmpFnameStr, fnameStr) < 0) {
|
||||
wError("failed to rename file due to %s. dest:%s", strerror(errno), fnameStr);
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
|
||||
}
|
||||
|
||||
// delete old file
|
||||
|
@ -957,12 +971,14 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
int walLoadMeta(SWal* pWal) {
|
||||
int32_t walLoadMeta(SWal* pWal) {
|
||||
int32_t code = 0;
|
||||
// find existing meta file
|
||||
int metaVer = walFindCurMetaVer(pWal);
|
||||
if (metaVer == -1) {
|
||||
wDebug("vgId:%d, wal find meta ver %d", pWal->cfg.vgId, metaVer);
|
||||
return -1;
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||
}
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
walBuildMetaName(pWal, metaVer, fnameStr);
|
||||
|
@ -972,39 +988,40 @@ int walLoadMeta(SWal* pWal) {
|
|||
if (fileSize == 0) {
|
||||
(void)taosRemoveFile(fnameStr);
|
||||
wDebug("vgId:%d, wal find empty meta ver %d", pWal->cfg.vgId, metaVer);
|
||||
return -1;
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||
}
|
||||
int size = (int)fileSize;
|
||||
char* buf = taosMemoryMalloc(size + 5);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
memset(buf, 0, size + 5);
|
||||
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
||||
if (pFile == NULL) {
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
taosMemoryFree(buf);
|
||||
return -1;
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
|
||||
}
|
||||
if (taosReadFile(pFile, buf, size) != size) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
taosCloseFile(&pFile);
|
||||
taosMemoryFree(buf);
|
||||
return -1;
|
||||
|
||||
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||
}
|
||||
// load into fileInfoSet
|
||||
int code = walMetaDeserialize(pWal, buf);
|
||||
code = walMetaDeserialize(pWal, buf);
|
||||
if (code < 0) {
|
||||
wError("failed to deserialize wal meta. file:%s", fnameStr);
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
code = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
}
|
||||
taosCloseFile(&pFile);
|
||||
taosMemoryFree(buf);
|
||||
return code;
|
||||
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int walRemoveMeta(SWal* pWal) {
|
||||
int32_t walRemoveMeta(SWal* pWal) {
|
||||
int metaVer = walFindCurMetaVer(pWal);
|
||||
if (metaVer == -1) return 0;
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
|
|
|
@ -171,7 +171,9 @@ TEST_F(WalCleanEnv, serialize) {
|
|||
ASSERT(code == 0);
|
||||
code = walRollFileInfo(pWal);
|
||||
ASSERT(code == 0);
|
||||
char* ss = walMetaSerialize(pWal);
|
||||
char* ss = NULL;
|
||||
code = walMetaSerialize(pWal, &ss);
|
||||
ASSERT(code == 0);
|
||||
printf("%s\n", ss);
|
||||
taosMemoryFree(ss);
|
||||
code = walSaveMeta(pWal);
|
||||
|
@ -206,7 +208,9 @@ TEST_F(WalKeepEnv, readOldMeta) {
|
|||
ASSERT_EQ(code, TSDB_CODE_WAL_INVALID_VER);
|
||||
ASSERT_EQ(pWal->vers.lastVer, i);
|
||||
}
|
||||
char* oldss = walMetaSerialize(pWal);
|
||||
char* oldss = NULL;
|
||||
code = walMetaSerialize(pWal, &oldss);
|
||||
ASSERT(code == 0);
|
||||
|
||||
TearDown();
|
||||
SetUp();
|
||||
|
@ -214,7 +218,9 @@ TEST_F(WalKeepEnv, readOldMeta) {
|
|||
ASSERT_EQ(pWal->vers.firstVer, 0);
|
||||
ASSERT_EQ(pWal->vers.lastVer, 9);
|
||||
|
||||
char* newss = walMetaSerialize(pWal);
|
||||
char* newss = NULL;
|
||||
code = walMetaSerialize(pWal, &newss);
|
||||
ASSERT(code == 0);
|
||||
|
||||
int len = strlen(oldss);
|
||||
ASSERT_EQ(len, strlen(newss));
|
||||
|
|
Loading…
Reference in New Issue