fix: refactor walScanLogGetLastVer, walCheckAndRepairMeta, and walCheckAndRepairIdx.
* search for the last entry in the contiguous range of valid WAL from the last pos fsynced as firstTrial * find the last entry before the last pos fsynced backwards as second trial * reserve sufficient space for computing CRC32 checksum, esp. of WAL record body * rebuild meta info to resolve potential misalignment between lists of meta and actual log files * retract commitIndex and appliedIndex to lastLogIndex if needed * put an upper size limit on possibly corrupted WAL range to be recovered
This commit is contained in:
parent
3af05070cf
commit
7fb827debd
|
@ -43,6 +43,7 @@ extern "C" {
|
||||||
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
||||||
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
|
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
|
||||||
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
|
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
|
||||||
|
#define WAL_RECOV_SIZE_LIMIT (100 * WAL_SCAN_BUF_SIZE)
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TAOS_WAL_WRITE = 1,
|
TAOS_WAL_WRITE = 1,
|
||||||
|
|
|
@ -450,6 +450,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_WAL_INVALID_VER TAOS_DEF_ERROR_CODE(0, 0x1003)
|
#define TSDB_CODE_WAL_INVALID_VER TAOS_DEF_ERROR_CODE(0, 0x1003)
|
||||||
#define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004)
|
#define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004)
|
||||||
#define TSDB_CODE_WAL_LOG_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x1005)
|
#define TSDB_CODE_WAL_LOG_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x1005)
|
||||||
|
#define TSDB_CODE_WAL_CHKSUM_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x1006)
|
||||||
|
|
||||||
// tfs
|
// tfs
|
||||||
#define TSDB_CODE_FS_INVLD_CFG TAOS_DEF_ERROR_CODE(0, 0x2201)
|
#define TSDB_CODE_FS_INVLD_CFG TAOS_DEF_ERROR_CODE(0, 0x2201)
|
||||||
|
|
|
@ -146,7 +146,7 @@ void taos_close(TAOS *taos) {
|
||||||
|
|
||||||
int taos_errno(TAOS_RES *res) {
|
int taos_errno(TAOS_RES *res) {
|
||||||
if (res == NULL || TD_RES_TMQ_META(res)) {
|
if (res == NULL || TD_RES_TMQ_META(res)) {
|
||||||
if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_QRY_NOT_READY;
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,13 +154,12 @@ int taos_errno(TAOS_RES *res) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return ((SRequestObj *)res)->code == TSDB_CODE_RPC_REDIRECT ? TSDB_CODE_RPC_NETWORK_UNAVAIL
|
return ((SRequestObj *)res)->code == TSDB_CODE_RPC_REDIRECT ? TSDB_CODE_QRY_NOT_READY : ((SRequestObj *)res)->code;
|
||||||
: ((SRequestObj *)res)->code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const char *taos_errstr(TAOS_RES *res) {
|
const char *taos_errstr(TAOS_RES *res) {
|
||||||
if (res == NULL || TD_RES_TMQ_META(res)) {
|
if (res == NULL || TD_RES_TMQ_META(res)) {
|
||||||
if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_QRY_NOT_READY;
|
||||||
return (const char *)tstrerror(terrno);
|
return (const char *)tstrerror(terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,7 +171,7 @@ const char *taos_errstr(TAOS_RES *res) {
|
||||||
if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) {
|
if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) {
|
||||||
return pRequest->msgBuf;
|
return pRequest->msgBuf;
|
||||||
} else {
|
} else {
|
||||||
return pRequest->code == TSDB_CODE_RPC_REDIRECT ? (const char *)tstrerror(TSDB_CODE_RPC_NETWORK_UNAVAIL)
|
return pRequest->code == TSDB_CODE_RPC_REDIRECT ? (const char *)tstrerror(TSDB_CODE_QRY_NOT_READY)
|
||||||
: (const char *)tstrerror(pRequest->code);
|
: (const char *)tstrerror(pRequest->code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,110 +43,177 @@ static FORCE_INLINE int walBuildTmpMetaName(SWal* pWal, char* buf) {
|
||||||
return sprintf(buf, "%s/meta-ver.tmp", pWal->path);
|
return sprintf(buf, "%s/meta-ver.tmp", pWal->path);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
|
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
|
||||||
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
if (sz <= 0) {
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
wError("No WAL log file found.");
|
ASSERT(fileIdx >= 0 && fileIdx < sz);
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, sz - 1);
|
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN];
|
||||||
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
|
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
|
||||||
|
|
||||||
int64_t fileSize = 0;
|
int64_t fileSize = 0;
|
||||||
taosStatFile(fnameStr, &fileSize, NULL);
|
taosStatFile(fnameStr, &fileSize, NULL);
|
||||||
int32_t readSize = TMIN(WAL_SCAN_BUF_SIZE, fileSize);
|
|
||||||
pLastFileInfo->fileSize = fileSize;
|
|
||||||
|
|
||||||
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
|
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
wError("failed to open file due to %s. file:%s", strerror(errno), fnameStr);
|
wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ensure size as non-negative
|
||||||
|
pFileInfo->fileSize = TMAX(0, pFileInfo->fileSize);
|
||||||
|
|
||||||
uint64_t magic = WAL_MAGIC;
|
uint64_t magic = WAL_MAGIC;
|
||||||
|
int64_t walCkHeadSz = sizeof(SWalCkHead);
|
||||||
|
int64_t end = fileSize;
|
||||||
|
int64_t offset = 0;
|
||||||
|
int32_t capacity = 0;
|
||||||
|
int32_t readSize = 0;
|
||||||
|
char* buf = NULL;
|
||||||
|
char* found = NULL;
|
||||||
|
bool firstTrial = pFileInfo->fileSize < fileSize;
|
||||||
|
|
||||||
char* buf = taosMemoryMalloc(readSize + sizeof(uint64_t));
|
// search for the valid last WAL entry, e.g. block by block
|
||||||
if (buf == NULL) {
|
|
||||||
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t offset;
|
|
||||||
offset = taosLSeekFile(pFile, -readSize, SEEK_END);
|
|
||||||
if (readSize != taosReadFile(pFile, buf, readSize)) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t walCkHeadSz = sizeof(SWalCkHead);
|
|
||||||
char* found = NULL;
|
|
||||||
while (1) {
|
while (1) {
|
||||||
char* haystack = buf;
|
offset = (firstTrial) ? pFileInfo->fileSize : TMAX(0, end - WAL_SCAN_BUF_SIZE);
|
||||||
|
ASSERT(offset <= end);
|
||||||
|
readSize = end - offset;
|
||||||
|
capacity = readSize + sizeof(magic);
|
||||||
|
|
||||||
|
int64_t limit = WAL_RECOV_SIZE_LIMIT;
|
||||||
|
if (limit < readSize) {
|
||||||
|
wError("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64
|
||||||
|
", end:%" PRId64 ", file:%s",
|
||||||
|
pWal->cfg.vgId, limit, offset, end, fnameStr);
|
||||||
|
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* ptr = taosMemoryRealloc(buf, capacity);
|
||||||
|
if (ptr == NULL) {
|
||||||
|
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
buf = ptr;
|
||||||
|
|
||||||
|
int64_t ret = taosLSeekFile(pFile, offset, SEEK_SET);
|
||||||
|
if (ret < 0) {
|
||||||
|
wError("vgId:%d, failed to lseek file due to %s. offset:%" PRId64 "", pWal->cfg.vgId, strerror(errno), offset);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (readSize != taosReadFile(pFile, buf, readSize)) {
|
||||||
|
wError("vgId:%d, failed to read file due to %s. readSize:%" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno),
|
||||||
|
readSize, fnameStr);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
char* candidate = NULL;
|
char* candidate = NULL;
|
||||||
while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) {
|
char* haystack = buf;
|
||||||
// read and validate
|
|
||||||
|
while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(magic))) != NULL) {
|
||||||
|
// validate head
|
||||||
int64_t len = readSize - (candidate - buf);
|
int64_t len = readSize - (candidate - buf);
|
||||||
if (len < walCkHeadSz) {
|
if (len < walCkHeadSz) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
SWalCkHead* logContent = (SWalCkHead*)candidate;
|
SWalCkHead* logContent = (SWalCkHead*)candidate;
|
||||||
if (walValidHeadCksum(logContent) == 0 &&
|
if (walValidHeadCksum(logContent) != 0) {
|
||||||
walCkHeadSz + logContent->head.bodyLen <= len &&
|
wError("vgId:%d, failed to validate checksum of wal entry header. offset:% %" PRId64 ", file:%s",
|
||||||
walValidBodyCksum(logContent) == 0) {
|
((char*)(logContent)-buf), fnameStr);
|
||||||
found = candidate;
|
haystack = candidate + 1;
|
||||||
|
if (firstTrial) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// validate body
|
||||||
|
int64_t size = walCkHeadSz + logContent->head.bodyLen;
|
||||||
|
if (len < size) {
|
||||||
|
int64_t extraSize = size - len;
|
||||||
|
if (capacity < readSize + extraSize + sizeof(magic)) {
|
||||||
|
capacity += extraSize;
|
||||||
|
void* ptr = taosMemoryRealloc(buf, capacity);
|
||||||
|
if (ptr == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
buf = ptr;
|
||||||
|
}
|
||||||
|
int64_t ret = taosLSeekFile(pFile, offset + readSize, SEEK_SET);
|
||||||
|
if (ret < 0) {
|
||||||
|
wError("vgId:%d, failed to lseek file due to %s. offset:%" PRId64 "", pWal->cfg.vgId, strerror(errno),
|
||||||
|
offset);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (extraSize != taosReadFile(pFile, buf + readSize, extraSize)) {
|
||||||
|
wError("vgId:%d, failed to read file due to %s. offset:%" PRId64 ", extraSize:%" PRId64 ", file:%s",
|
||||||
|
pWal->cfg.vgId, strerror(errno), offset + readSize, extraSize, fnameStr);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (walValidBodyCksum(logContent) != 0) {
|
||||||
|
terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH;
|
||||||
|
wError("vgId:%d, failed to validate checksum of wal entry body. offset:% %" PRId64 ", file:%s",
|
||||||
|
((char*)(logContent)-buf), fnameStr);
|
||||||
|
haystack = candidate + 1;
|
||||||
|
if (firstTrial) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// found one
|
||||||
|
found = candidate;
|
||||||
haystack = candidate + 1;
|
haystack = candidate + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (found || offset == 0) break;
|
if (found || offset == 0) break;
|
||||||
|
|
||||||
// go backwards, i.e. by at most one WAL scan buf size
|
// go backwards, e.g. by at most one WAL scan buf size
|
||||||
offset = TMAX(0, offset - readSize + walCkHeadSz);
|
end = offset + walCkHeadSz - 1;
|
||||||
int64_t ret = taosLSeekFile(pFile, offset, SEEK_SET);
|
firstTrial = false;
|
||||||
if (ret < 0) {
|
|
||||||
wError("failed to lseek file due to %s. offset:%lld", strerror(errno), offset);
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
ASSERT(offset == ret);
|
|
||||||
if (readSize != taosReadFile(pFile, buf, readSize)) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (found == NULL) {
|
// determine end of last entry
|
||||||
wError("WAL log file corrupted: no valid WAL record found. file: %s", fnameStr);
|
SWalCkHead* lastEntry = (SWalCkHead*)found;
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
int64_t retVer = -1;
|
||||||
taosRemoveFile(fnameStr);
|
int64_t lastEntryBeginOffset = 0;
|
||||||
wWarn("vgId:%d, remove corrupted WAL log file: %s", pWal->cfg.vgId, fnameStr);
|
int64_t lastEntryEndOffset = 0;
|
||||||
goto _err;
|
|
||||||
|
if (lastEntry == NULL) {
|
||||||
|
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||||
|
} else {
|
||||||
|
retVer = lastEntry->head.version;
|
||||||
|
lastEntryBeginOffset = offset + (int64_t)((char*)lastEntry - (char*)buf);
|
||||||
|
lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
// truncate file
|
// truncate file
|
||||||
SWalCkHead* lastEntry = (SWalCkHead*)found;
|
|
||||||
int64_t retVer = lastEntry->head.version;
|
|
||||||
int64_t lastEntryBeginOffset = offset + (int64_t)((char*)found - (char*)buf);
|
|
||||||
int64_t lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen;
|
|
||||||
if (lastEntryEndOffset != fileSize) {
|
if (lastEntryEndOffset != fileSize) {
|
||||||
wWarn("vgId:%d repair meta truncate file %s to %ld, orig size %ld", pWal->cfg.vgId, fnameStr, lastEntryEndOffset,
|
wWarn("vgId:%d, repair meta truncate file %s to %ld, orig size %ld", pWal->cfg.vgId, fnameStr, lastEntryEndOffset,
|
||||||
fileSize);
|
fileSize);
|
||||||
if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) {
|
if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) {
|
||||||
wError("failed to truncate file due to %s. file:%s", strerror(errno), fnameStr);
|
wError("failed to truncate file due to %s. file:%s", strerror(errno), fnameStr);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
if (taosFsyncFile(pFile) < 0) {
|
if (taosFsyncFile(pFile) < 0) {
|
||||||
wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr);
|
wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = lastEntryEndOffset;
|
|
||||||
pWal->totSize -= (fileSize - lastEntryEndOffset);
|
|
||||||
}
|
}
|
||||||
|
pFileInfo->fileSize = lastEntryEndOffset;
|
||||||
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
|
@ -158,13 +225,69 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void walRebuildFileInfoSet(SArray* metaLogList, SArray* actualLogList) {
|
||||||
|
int metaFileNum = taosArrayGetSize(metaLogList);
|
||||||
|
int actualFileNum = taosArrayGetSize(actualLogList);
|
||||||
|
int j = 0;
|
||||||
|
|
||||||
|
// both of the lists in asc order
|
||||||
|
for (int i = 0; i < actualFileNum; i++) {
|
||||||
|
SWalFileInfo* pLogInfo = taosArrayGet(actualLogList, i);
|
||||||
|
while (j < metaFileNum) {
|
||||||
|
SWalFileInfo* pMetaInfo = taosArrayGet(metaLogList, j);
|
||||||
|
ASSERT(pMetaInfo != NULL);
|
||||||
|
if (pMetaInfo->firstVer < pLogInfo->firstVer) {
|
||||||
|
j++;
|
||||||
|
} else if (pMetaInfo->firstVer == pLogInfo->firstVer) {
|
||||||
|
(*pLogInfo) = *pMetaInfo;
|
||||||
|
j++;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayClear(metaLogList);
|
||||||
|
|
||||||
|
for (int i = 0; i < actualFileNum; i++) {
|
||||||
|
SWalFileInfo* pFileInfo = taosArrayGet(actualLogList, i);
|
||||||
|
taosArrayPush(metaLogList, pFileInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void walAlignVersions(SWal* pWal) {
|
||||||
|
if (pWal->vers.firstVer > pWal->vers.snapshotVer) {
|
||||||
|
wWarn("vgId:%d, firstVer:%" PRId64 " is larger than snapshotVer:%" PRId64 ". reset it.", pWal->cfg.vgId,
|
||||||
|
pWal->vers.firstVer, pWal->vers.snapshotVer);
|
||||||
|
pWal->vers.firstVer = pWal->vers.snapshotVer;
|
||||||
|
}
|
||||||
|
if (pWal->vers.lastVer < pWal->vers.snapshotVer) {
|
||||||
|
wWarn("vgId:%d, lastVer:%" PRId64 " is less than snapshotVer:%" PRId64 ". reset it.", pWal->cfg.vgId,
|
||||||
|
pWal->vers.lastVer, pWal->vers.snapshotVer);
|
||||||
|
pWal->vers.lastVer = pWal->vers.snapshotVer;
|
||||||
|
}
|
||||||
|
if (pWal->vers.commitVer < pWal->vers.snapshotVer) {
|
||||||
|
wWarn("vgId:%d, commitVer:%" PRId64 " is less than snapshotVer:%" PRId64 ". reset it.", pWal->cfg.vgId,
|
||||||
|
pWal->vers.commitVer, pWal->vers.snapshotVer);
|
||||||
|
pWal->vers.commitVer = pWal->vers.snapshotVer;
|
||||||
|
}
|
||||||
|
if (pWal->vers.appliedVer < pWal->vers.snapshotVer) {
|
||||||
|
wWarn("vgId:%d, appliedVer:%" PRId64 " is less than snapshotVer:%" PRId64 ". reset it.", pWal->cfg.vgId,
|
||||||
|
pWal->vers.appliedVer, pWal->vers.snapshotVer);
|
||||||
|
pWal->vers.appliedVer = pWal->vers.snapshotVer;
|
||||||
|
}
|
||||||
|
|
||||||
|
pWal->vers.commitVer = TMIN(pWal->vers.lastVer, pWal->vers.commitVer);
|
||||||
|
pWal->vers.appliedVer = TMIN(pWal->vers.commitVer, pWal->vers.appliedVer);
|
||||||
|
}
|
||||||
|
|
||||||
int walCheckAndRepairMeta(SWal* pWal) {
|
int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
// load log files, get first/snapshot/last version info
|
// load log files, get first/snapshot/last version info
|
||||||
const char* logPattern = "^[0-9]+.log$";
|
const char* logPattern = "^[0-9]+.log$";
|
||||||
const char* idxPattern = "^[0-9]+.idx$";
|
const char* idxPattern = "^[0-9]+.idx$";
|
||||||
regex_t logRegPattern;
|
regex_t logRegPattern;
|
||||||
regex_t idxRegPattern;
|
regex_t idxRegPattern;
|
||||||
bool fixed = false;
|
|
||||||
|
|
||||||
regcomp(&logRegPattern, logPattern, REG_EXTENDED);
|
regcomp(&logRegPattern, logPattern, REG_EXTENDED);
|
||||||
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
|
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
|
||||||
|
@ -198,222 +321,238 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
|
|
||||||
taosArraySort(actualLog, compareWalFileInfo);
|
taosArraySort(actualLog, compareWalFileInfo);
|
||||||
|
|
||||||
int metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
|
int metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
int actualFileNum = taosArrayGetSize(actualLog);
|
int actualFileNum = taosArrayGetSize(actualLog);
|
||||||
|
int64_t firstVerPrev = pWal->vers.firstVer;
|
||||||
|
int64_t lastVerPrev = pWal->vers.lastVer;
|
||||||
|
int64_t totSize = 0;
|
||||||
|
bool updateMeta = (metaFileNum != actualFileNum);
|
||||||
|
|
||||||
#if 0
|
// rebuild meta of file info
|
||||||
for (int32_t fileNo = actualFileNum - 1; fileNo >= 0; fileNo--) {
|
walRebuildFileInfoSet(pWal->fileInfoSet, actualLog);
|
||||||
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, fileNo);
|
taosArrayDestroy(actualLog);
|
||||||
|
|
||||||
|
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
|
ASSERT(sz == actualFileNum);
|
||||||
|
|
||||||
|
// scan and determine the lastVer
|
||||||
|
int32_t fileIdx = sz;
|
||||||
|
|
||||||
|
while (--fileIdx >= 0) {
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN];
|
||||||
|
int64_t fileSize = 0;
|
||||||
|
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
|
||||||
|
|
||||||
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
|
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
|
||||||
int64_t fileSize = 0;
|
int32_t code = taosStatFile(fnameStr, &fileSize, NULL);
|
||||||
taosStatFile(fnameStr, &fileSize, NULL);
|
if (code < 0) {
|
||||||
if (fileSize == 0) {
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
wError("failed to stat file since %s. file:%s", terrstr(), fnameStr);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pFileInfo->firstVer >= 0);
|
||||||
|
|
||||||
|
if (pFileInfo->lastVer >= pFileInfo->firstVer && fileSize == pFileInfo->fileSize) {
|
||||||
|
totSize += pFileInfo->fileSize;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
updateMeta = true;
|
||||||
|
|
||||||
|
int64_t lastVer = walScanLogGetLastVer(pWal, fileIdx);
|
||||||
|
if (lastVer < 0) {
|
||||||
|
if (terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
||||||
|
wError("failed to scan wal last ver since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
ASSERT(pFileInfo->fileSize == 0);
|
||||||
|
// remove the empty wal log, and its idx
|
||||||
taosRemoveFile(fnameStr);
|
taosRemoveFile(fnameStr);
|
||||||
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
|
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
|
||||||
taosRemoveFile(fnameStr);
|
taosRemoveFile(fnameStr);
|
||||||
taosArrayPop(pLogInfoArray);
|
// remove its meta entry
|
||||||
} else {
|
taosArrayRemove(pWal->fileInfoSet, fileIdx);
|
||||||
break;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update lastVer
|
||||||
|
pFileInfo->lastVer = lastVer;
|
||||||
|
totSize += pFileInfo->fileSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
actualFileNum = taosArrayGetSize(pLogInfoArray);
|
// reset vers info and so on
|
||||||
#endif
|
|
||||||
|
|
||||||
{
|
|
||||||
int32_t i = 0, j = 0;
|
|
||||||
while (i < actualFileNum && j < metaFileNum) {
|
|
||||||
SWalFileInfo* pActualFile = taosArrayGet(actualLog, i);
|
|
||||||
SWalFileInfo* pMetaFile = taosArrayGet(pWal->fileInfoSet, j);
|
|
||||||
if (pActualFile->firstVer < pMetaFile->firstVer) {
|
|
||||||
char fNameStr[WAL_FILE_LEN];
|
|
||||||
walBuildLogName(pWal, pActualFile->firstVer, fNameStr);
|
|
||||||
taosRemoveFile(fNameStr);
|
|
||||||
walBuildIdxName(pWal, pActualFile->firstVer, fNameStr);
|
|
||||||
taosRemoveFile(fNameStr);
|
|
||||||
i++;
|
|
||||||
} else if (pActualFile->firstVer > pMetaFile->firstVer) {
|
|
||||||
taosArrayRemove(pWal->fileInfoSet, j);
|
|
||||||
metaFileNum--;
|
|
||||||
} else {
|
|
||||||
i++;
|
|
||||||
j++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (i == actualFileNum && j == metaFileNum) {
|
|
||||||
if (j > 0) {
|
|
||||||
SWalFileInfo* pLastInfo = taosArrayGet(pWal->fileInfoSet, j - 1);
|
|
||||||
int64_t fsize = 0;
|
|
||||||
char fNameStr[WAL_FILE_LEN];
|
|
||||||
walBuildLogName(pWal, pLastInfo->firstVer, fNameStr);
|
|
||||||
taosStatFile(fNameStr, &fsize, NULL);
|
|
||||||
if (pLastInfo->fileSize != fsize) {
|
|
||||||
fixed = true;
|
|
||||||
pLastInfo->fileSize = fsize;
|
|
||||||
pLastInfo->lastVer = walScanLogGetLastVer(pWal);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
fixed = true;
|
|
||||||
while (i < actualFileNum) {
|
|
||||||
SWalFileInfo* pActualFile = taosArrayGet(actualLog, i);
|
|
||||||
char fNameStr[WAL_FILE_LEN];
|
|
||||||
walBuildLogName(pWal, pActualFile->firstVer, fNameStr);
|
|
||||||
taosStatFile(fNameStr, &pActualFile->fileSize, NULL);
|
|
||||||
|
|
||||||
if (pActualFile->fileSize == 0) {
|
|
||||||
ASSERT(i == actualFileNum - 1);
|
|
||||||
taosRemoveFile(fNameStr);
|
|
||||||
|
|
||||||
walBuildIdxName(pWal, pActualFile->firstVer, fNameStr);
|
|
||||||
taosRemoveFile(fNameStr);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (i < actualFileNum - 1) {
|
|
||||||
pActualFile->lastVer = ((SWalFileInfo*)taosArrayGet(actualLog, i + 1))->firstVer - 1;
|
|
||||||
taosArrayPush(pWal->fileInfoSet, pActualFile);
|
|
||||||
i++;
|
|
||||||
} else {
|
|
||||||
pActualFile = taosArrayPush(pWal->fileInfoSet, pActualFile);
|
|
||||||
pActualFile->lastVer = walScanLogGetLastVer(pWal);
|
|
||||||
if (pActualFile->lastVer == -1) {
|
|
||||||
taosRemoveFile(fNameStr);
|
|
||||||
|
|
||||||
walBuildIdxName(pWal, pActualFile->firstVer, fNameStr);
|
|
||||||
taosRemoveFile(fNameStr);
|
|
||||||
taosArrayPop(pWal->fileInfoSet);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
if (metaFileNum > actualFileNum) {
|
|
||||||
taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum);
|
|
||||||
} else if (metaFileNum < actualFileNum) {
|
|
||||||
for (int i = metaFileNum; i < actualFileNum; i++) {
|
|
||||||
SWalFileInfo* pFileInfo = taosArrayGet(actualLog, i);
|
|
||||||
taosArrayPush(pWal->fileInfoSet, pFileInfo);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
taosArrayDestroy(actualLog);
|
|
||||||
|
|
||||||
actualFileNum = taosArrayGetSize(pWal->fileInfoSet);
|
actualFileNum = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
pWal->writeCur = actualFileNum - 1;
|
pWal->writeCur = actualFileNum - 1;
|
||||||
|
pWal->totSize = totSize;
|
||||||
|
pWal->vers.firstVer = -1;
|
||||||
|
pWal->vers.lastVer = -1;
|
||||||
if (actualFileNum > 0) {
|
if (actualFileNum > 0) {
|
||||||
int64_t fLastVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur))->lastVer;
|
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
|
||||||
if (fLastVer != -1 && pWal->vers.lastVer != fLastVer) {
|
pWal->vers.lastVer = ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer;
|
||||||
fixed = true;
|
}
|
||||||
pWal->vers.lastVer = fLastVer;
|
(void)walAlignVersions(pWal);
|
||||||
}
|
|
||||||
int64_t fFirstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
|
// update meta file
|
||||||
if (fFirstVer != pWal->vers.firstVer) {
|
if (updateMeta) {
|
||||||
fixed = true;
|
(void)walSaveMeta(pWal);
|
||||||
pWal->vers.firstVer = fFirstVer;
|
}
|
||||||
}
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) {
|
||||||
|
if (taosLSeekFile(pLogFile, offset, SEEK_SET) < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fixed) {
|
if (taosReadFile(pLogFile, pCkHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
|
||||||
walSaveMeta(pWal);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (walValidHeadCksum(pCkHead) != 0) {
|
||||||
|
terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int walCheckAndRepairIdx(SWal* pWal) {
|
int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
|
||||||
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
ASSERT(fileIdx >= 0 && fileIdx < sz);
|
||||||
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, i);
|
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
|
||||||
|
char fnameStr[WAL_FILE_LEN];
|
||||||
|
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
|
||||||
|
char fLogNameStr[WAL_FILE_LEN];
|
||||||
|
walBuildLogName(pWal, pFileInfo->firstVer, fLogNameStr);
|
||||||
|
int64_t fileSize = 0;
|
||||||
|
|
||||||
char fnameStr[WAL_FILE_LEN];
|
if (taosStatFile(fnameStr, &fileSize, NULL) < 0 && errno != ENOENT) {
|
||||||
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
|
wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
|
||||||
int64_t fsize;
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE);
|
return -1;
|
||||||
if (pIdxFile == NULL) {
|
}
|
||||||
ASSERT(0);
|
|
||||||
|
ASSERT(pFileInfo->fileSize > 0 && pFileInfo->firstVer >= 0 && pFileInfo->lastVer >= pFileInfo->firstVer);
|
||||||
|
if (fileSize == (pFileInfo->lastVer - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// start to repair
|
||||||
|
int64_t offset = fileSize - fileSize % sizeof(SWalIdxEntry);
|
||||||
|
TdFilePtr pLogFile = NULL;
|
||||||
|
TdFilePtr pIdxFile = NULL;
|
||||||
|
SWalIdxEntry idxEntry = {.ver = pFileInfo->firstVer - 1, .offset = -sizeof(SWalCkHead)};
|
||||||
|
SWalCkHead ckHead;
|
||||||
|
memset(&ckHead, 0, sizeof(ckHead));
|
||||||
|
ckHead.head.version = idxEntry.ver;
|
||||||
|
|
||||||
|
pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE);
|
||||||
|
if (pIdxFile == NULL) {
|
||||||
|
wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
pLogFile = taosOpenFile(fLogNameStr, TD_FILE_READ);
|
||||||
|
if (pLogFile == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fLogNameStr, terrstr());
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// determine the last valid entry end, i.e. offset
|
||||||
|
while ((offset -= sizeof(SWalIdxEntry)) >= 0) {
|
||||||
|
if (taosLSeekFile(pIdxFile, offset, SEEK_SET) < 0) {
|
||||||
|
wError("vgId:%d, failed to seek file due to %s. offset:" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno),
|
||||||
|
offset, fnameStr);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fnameStr, terrstr());
|
goto _err;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosFStatFile(pIdxFile, &fsize, NULL);
|
if (taosReadFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
|
||||||
if (fsize == (pFileInfo->lastVer - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)) {
|
wError("vgId:%d, failed to read file due to %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno),
|
||||||
taosCloseFile(&pIdxFile);
|
offset, fnameStr);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (idxEntry.ver > pFileInfo->lastVer) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t left = fsize % sizeof(SWalIdxEntry);
|
if (walReadLogHead(pLogFile, idxEntry.offset, &ckHead) < 0) {
|
||||||
int64_t offset = taosLSeekFile(pIdxFile, -left, SEEK_END);
|
wWarn("vgId:%d, failed to read log file since %s. file:%s, offset:%" PRId64 ", idx entry ver:%" PRId64 "",
|
||||||
if (left != 0) {
|
pWal->cfg.vgId, terrstr(), fLogNameStr, idxEntry.offset, idxEntry.ver);
|
||||||
taosFtruncateFile(pIdxFile, offset);
|
continue;
|
||||||
wWarn("vgId:%d wal truncate file %s to offset %ld since size invalid, file size %ld", pWal->cfg.vgId, fnameStr,
|
|
||||||
offset, fsize);
|
|
||||||
}
|
|
||||||
offset -= sizeof(SWalIdxEntry);
|
|
||||||
|
|
||||||
SWalIdxEntry idxEntry = {.ver = pFileInfo->firstVer};
|
|
||||||
while (1) {
|
|
||||||
if (offset < 0) {
|
|
||||||
taosLSeekFile(pIdxFile, 0, SEEK_SET);
|
|
||||||
taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
taosLSeekFile(pIdxFile, offset, SEEK_SET);
|
|
||||||
int64_t contLen = taosReadFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry));
|
|
||||||
if (contLen < 0 || contLen != sizeof(SWalIdxEntry)) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if ((idxEntry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry) != offset) {
|
|
||||||
taosFtruncateFile(pIdxFile, offset);
|
|
||||||
wWarn("vgId:%d wal truncate file %s to offset %ld since entry invalid, entry ver %ld, entry offset %ld",
|
|
||||||
pWal->cfg.vgId, fnameStr, offset, idxEntry.ver, idxEntry.offset);
|
|
||||||
offset -= sizeof(SWalIdxEntry);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (idxEntry.ver < pFileInfo->lastVer) {
|
if (idxEntry.ver == ckHead.head.version) {
|
||||||
char fLogNameStr[WAL_FILE_LEN];
|
break;
|
||||||
walBuildLogName(pWal, pFileInfo->firstVer, fLogNameStr);
|
}
|
||||||
TdFilePtr pLogFile = taosOpenFile(fLogNameStr, TD_FILE_READ);
|
}
|
||||||
if (pLogFile == NULL) {
|
offset += sizeof(SWalIdxEntry);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fLogNameStr, terrstr());
|
// ftruncate idx file
|
||||||
return -1;
|
if (offset < fileSize) {
|
||||||
}
|
if (taosFtruncateFile(pIdxFile, offset) < 0) {
|
||||||
while (idxEntry.ver < pFileInfo->lastVer) {
|
wError("vgId:%d, failed to ftruncate file due to %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
|
||||||
if (taosLSeekFile(pLogFile, idxEntry.offset, SEEK_SET) == -1) {
|
strerror(errno), offset, fnameStr);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("vgId:%d, cannot seek file %s at %ld, since %s", pWal->cfg.vgId, fLogNameStr, idxEntry.offset,
|
goto _err;
|
||||||
terrstr());
|
}
|
||||||
return -1;
|
}
|
||||||
}
|
|
||||||
SWalCkHead ckHead;
|
// rebuild idx file
|
||||||
taosReadFile(pLogFile, &ckHead, sizeof(SWalCkHead));
|
if (taosLSeekFile(pIdxFile, 0, SEEK_END) < 0) {
|
||||||
if (idxEntry.ver != ckHead.head.version) {
|
wError("vgId:%d, failed to seek file due to %s. offset:" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno),
|
||||||
// todo truncate this idx also
|
offset, fnameStr);
|
||||||
taosCloseFile(&pLogFile);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("vgId:%d, invalid repair case, log seek to %ld to find ver %ld, actual ver %ld", pWal->cfg.vgId,
|
goto _err;
|
||||||
idxEntry.offset, idxEntry.ver, ckHead.head.version);
|
}
|
||||||
return -1;
|
|
||||||
}
|
while (idxEntry.ver < pFileInfo->lastVer) {
|
||||||
idxEntry.ver = ckHead.head.version + 1;
|
ASSERT(idxEntry.ver == ckHead.head.version);
|
||||||
idxEntry.offset = idxEntry.offset + sizeof(SWalCkHead) + ckHead.head.bodyLen;
|
|
||||||
wWarn("vgId:%d wal idx append new entry %ld %ld", pWal->cfg.vgId, idxEntry.ver, idxEntry.offset);
|
idxEntry.ver += 1;
|
||||||
taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry));
|
idxEntry.offset += sizeof(SWalCkHead) + ckHead.head.bodyLen;
|
||||||
}
|
|
||||||
taosCloseFile(&pLogFile);
|
if (walReadLogHead(pLogFile, idxEntry.offset, &ckHead) < 0) {
|
||||||
|
wError("vgId:%d, failed to read wal log head since %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, terrstr(),
|
||||||
|
idxEntry.offset, fLogNameStr);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
wWarn("vgId:%d wal idx append new entry %ld %ld", pWal->cfg.vgId, idxEntry.ver, idxEntry.offset);
|
||||||
|
if (taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) {
|
||||||
|
wError("vgId:%d, failed to append file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosFsyncFile(pIdxFile) < 0) {
|
||||||
|
wError("vgId:%d, faild to fsync file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
(void)taosCloseFile(&pLogFile);
|
||||||
|
(void)taosCloseFile(&pIdxFile);
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
(void)taosCloseFile(&pLogFile);
|
||||||
|
(void)taosCloseFile(&pIdxFile);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int walCheckAndRepairIdx(SWal* pWal) {
|
||||||
|
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
|
int32_t fileIdx = sz;
|
||||||
|
while (--fileIdx >= 0) {
|
||||||
|
if (walCheckAndRepairIdxFile(pWal, fileIdx) < 0) {
|
||||||
|
wError("vgId:%d, failed to repair idx file since %s. fileIdx:%d", pWal->cfg.vgId, terrstr(), fileIdx);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
taosCloseFile(&pIdxFile);
|
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -529,7 +668,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
|
||||||
taosArrayEnsureCap(pArray, sz);
|
taosArrayEnsureCap(pArray, sz);
|
||||||
SWalFileInfo* pData = pArray->pData;
|
SWalFileInfo* pData = pArray->pData;
|
||||||
for (int i = 0; i < sz; i++) {
|
for (int i = 0; i < sz; i++) {
|
||||||
cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i);
|
cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i);
|
||||||
if (!pInfoJson) goto _err;
|
if (!pInfoJson) goto _err;
|
||||||
SWalFileInfo* pInfo = &pData[i];
|
SWalFileInfo* pInfo = &pData[i];
|
||||||
pField = cJSON_GetObjectItem(pInfoJson, "firstVer");
|
pField = cJSON_GetObjectItem(pInfoJson, "firstVer");
|
||||||
|
@ -593,7 +732,20 @@ int walSaveMeta(SWal* pWal) {
|
||||||
int metaVer = walFindCurMetaVer(pWal);
|
int metaVer = walFindCurMetaVer(pWal);
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN];
|
||||||
char tmpFnameStr[WAL_FILE_LEN];
|
char tmpFnameStr[WAL_FILE_LEN];
|
||||||
int n;
|
int n;
|
||||||
|
|
||||||
|
// fsync the idx and log file at first to ensure validity of meta
|
||||||
|
if (taosFsyncFile(pWal->pIdxFile) < 0) {
|
||||||
|
wError("vgId:%d, failed to sync idx file due to %s", pWal->cfg.vgId, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosFsyncFile(pWal->pLogFile) < 0) {
|
||||||
|
wError("vgId:%d, failed to sync log file due to %s", pWal->cfg.vgId, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
// flush to a tmpfile
|
// flush to a tmpfile
|
||||||
n = walBuildTmpMetaName(pWal, tmpFnameStr);
|
n = walBuildTmpMetaName(pWal, tmpFnameStr);
|
||||||
|
@ -601,7 +753,7 @@ int walSaveMeta(SWal* pWal) {
|
||||||
|
|
||||||
TdFilePtr pMetaFile = taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
TdFilePtr pMetaFile = taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
if (pMetaFile == NULL) {
|
if (pMetaFile == NULL) {
|
||||||
wError("failed to open file due to %s. file:%s", strerror(errno), tmpFnameStr);
|
wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -609,20 +761,19 @@ int walSaveMeta(SWal* pWal) {
|
||||||
char* serialized = walMetaSerialize(pWal);
|
char* serialized = walMetaSerialize(pWal);
|
||||||
int len = strlen(serialized);
|
int len = strlen(serialized);
|
||||||
if (len != taosWriteFile(pMetaFile, serialized, len)) {
|
if (len != taosWriteFile(pMetaFile, serialized, len)) {
|
||||||
// TODO:clean file
|
wError("vgId:%d, failed to write file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
|
||||||
wError("failed to write file due to %s. file:%s", strerror(errno), tmpFnameStr);
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosFsyncFile(pMetaFile) < 0) {
|
if (taosFsyncFile(pMetaFile) < 0) {
|
||||||
wError("failed to sync file due to %s. file:%s", strerror(errno), tmpFnameStr);
|
wError("vgId:%d, failed to sync file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosCloseFile(&pMetaFile) < 0) {
|
if (taosCloseFile(&pMetaFile) < 0) {
|
||||||
wError("failed to close file due to %s. file:%s", strerror(errno), tmpFnameStr);
|
wError("vgId:%d, failed to close file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,8 +110,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
// init ref
|
// init ref
|
||||||
pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||||
if (pWal->pRefHash == NULL) {
|
if (pWal->pRefHash == NULL) {
|
||||||
wError("failed to init hash since %s", tstrerror(terrno));
|
wError("failed to init hash since %s", tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// open meta
|
// open meta
|
||||||
|
@ -149,8 +149,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
// add ref
|
// add ref
|
||||||
pWal->refId = taosAddRef(tsWal.refSetId, pWal);
|
pWal->refId = taosAddRef(tsWal.refSetId, pWal);
|
||||||
if (pWal->refId < 0) {
|
if (pWal->refId < 0) {
|
||||||
wError("failed to add ref for Wal since %s", tstrerror(terrno));
|
wError("failed to add ref for Wal since %s", tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
|
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
|
||||||
|
|
|
@ -476,7 +476,8 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
}
|
}
|
||||||
wError("vgId:%d, failed to read WAL record head from log file since %s", pReader->pWal->cfg.vgId, terrstr());
|
wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s",
|
||||||
|
pReader->pWal->cfg.vgId, ver, terrstr());
|
||||||
taosThreadMutexUnlock(&pReader->mutex);
|
taosThreadMutexUnlock(&pReader->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -509,7 +510,8 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
|
||||||
else {
|
else {
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
}
|
}
|
||||||
wError("vgId:%d, failed to read WAL record body from log file since %s", pReader->pWal->cfg.vgId, terrstr());
|
wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s",
|
||||||
|
pReader->pWal->cfg.vgId, ver, terrstr());
|
||||||
taosThreadMutexUnlock(&pReader->mutex);
|
taosThreadMutexUnlock(&pReader->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,6 +79,11 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) {
|
||||||
TdFilePtr pIdxTFile, pLogTFile;
|
TdFilePtr pIdxTFile, pLogTFile;
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN];
|
||||||
if (pWal->pLogFile != NULL) {
|
if (pWal->pLogFile != NULL) {
|
||||||
|
code = taosFsyncFile(pWal->pLogFile);
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
code = taosCloseFile(&pWal->pLogFile);
|
code = taosCloseFile(&pWal->pLogFile);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
@ -86,6 +91,11 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pWal->pIdxFile != NULL) {
|
if (pWal->pIdxFile != NULL) {
|
||||||
|
code = taosFsyncFile(pWal->pIdxFile);
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
code = taosCloseFile(&pWal->pIdxFile);
|
code = taosCloseFile(&pWal->pIdxFile);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
|
|
@ -540,6 +540,11 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
|
||||||
|
|
||||||
void walFsync(SWal *pWal, bool forceFsync) {
|
void walFsync(SWal *pWal, bool forceFsync) {
|
||||||
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
|
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
|
||||||
|
wTrace("vgId:%d, fileId:%" PRId64 ".idx, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
|
||||||
|
if (taosFsyncFile(pWal->pIdxFile) < 0) {
|
||||||
|
wError("vgId:%d, file:%" PRId64 ".idx, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
|
||||||
|
strerror(errno));
|
||||||
|
}
|
||||||
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
|
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
|
||||||
if (taosFsyncFile(pWal->pLogFile) < 0) {
|
if (taosFsyncFile(pWal->pLogFile) < 0) {
|
||||||
wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
|
wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
|
||||||
|
|
|
@ -447,12 +447,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND, "TQ table schema not f
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "TQ no commited offset")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "TQ no commited offset")
|
||||||
|
|
||||||
// wal
|
// wal
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Wal unexpected generic error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "WAL unexpected generic error")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted")
|
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit")
|
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid version")
|
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid version")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_OUT_OF_MEMORY, "WAL out of memory")
|
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_OUT_OF_MEMORY, "WAL out of memory")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_LOG_NOT_EXIST, "WAL log not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_LOG_NOT_EXIST, "WAL log not exist")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_CHKSUM_MISMATCH, "WAL checksum mismatch")
|
||||||
|
|
||||||
// tfs
|
// tfs
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_CFG, "tfs invalid mount config")
|
TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_CFG, "tfs invalid mount config")
|
||||||
|
|
Loading…
Reference in New Issue