fix: fsync wal files and their meta if data size unsynced exceeds a limit
This commit is contained in:
parent
bfb92713b4
commit
cccfa17027
|
@ -34,6 +34,7 @@ typedef struct {
|
||||||
int64_t createTs;
|
int64_t createTs;
|
||||||
int64_t closeTs;
|
int64_t closeTs;
|
||||||
int64_t fileSize;
|
int64_t fileSize;
|
||||||
|
int64_t syncedOffset;
|
||||||
} SWalFileInfo;
|
} SWalFileInfo;
|
||||||
|
|
||||||
typedef struct WalIdxEntry {
|
typedef struct WalIdxEntry {
|
||||||
|
@ -66,6 +67,12 @@ static inline int64_t walGetLastFileSize(SWal* pWal) {
|
||||||
return pInfo->fileSize;
|
return pInfo->fileSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline int64_t walGetLastFileCachedSize(SWal* pWal) {
|
||||||
|
if (taosArrayGetSize(pWal->fileInfoSet) == 0) return 0;
|
||||||
|
SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
|
||||||
|
return (pInfo->fileSize - pInfo->syncedOffset);
|
||||||
|
}
|
||||||
|
|
||||||
static inline int64_t walGetLastFileFirstVer(SWal* pWal) {
|
static inline int64_t walGetLastFileFirstVer(SWal* pWal) {
|
||||||
if (taosArrayGetSize(pWal->fileInfoSet) == 0) return -1;
|
if (taosArrayGetSize(pWal->fileInfoSet) == 0) return -1;
|
||||||
SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
|
SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
|
||||||
|
|
|
@ -589,6 +589,7 @@ int walRollFileInfo(SWal* pWal) {
|
||||||
pNewInfo->createTs = ts;
|
pNewInfo->createTs = ts;
|
||||||
pNewInfo->closeTs = -1;
|
pNewInfo->closeTs = -1;
|
||||||
pNewInfo->fileSize = 0;
|
pNewInfo->fileSize = 0;
|
||||||
|
pNewInfo->syncedOffset = 0;
|
||||||
taosArrayPush(pArray, pNewInfo);
|
taosArrayPush(pArray, pNewInfo);
|
||||||
taosMemoryFree(pNewInfo);
|
taosMemoryFree(pNewInfo);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -739,6 +740,12 @@ static int walFindCurMetaVer(SWal* pWal) {
|
||||||
return metaVer;
|
return metaVer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void walUpdateSyncedOffset(SWal* pWal) {
|
||||||
|
SWalFileInfo* pFileInfo = walGetCurFileInfo(pWal);
|
||||||
|
if (pFileInfo == NULL) return;
|
||||||
|
pFileInfo->syncedOffset = pFileInfo->fileSize;
|
||||||
|
}
|
||||||
|
|
||||||
int walSaveMeta(SWal* pWal) {
|
int walSaveMeta(SWal* pWal) {
|
||||||
int metaVer = walFindCurMetaVer(pWal);
|
int metaVer = walFindCurMetaVer(pWal);
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN];
|
||||||
|
@ -758,6 +765,9 @@ int walSaveMeta(SWal* pWal) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update synced offset
|
||||||
|
(void)walUpdateSyncedOffset(pWal);
|
||||||
|
|
||||||
// flush to a tmpfile
|
// flush to a tmpfile
|
||||||
n = walBuildTmpMetaName(pWal, tmpFnameStr);
|
n = walBuildTmpMetaName(pWal, tmpFnameStr);
|
||||||
ASSERT(n < sizeof(tmpFnameStr) && "Buffer overflow of file name");
|
ASSERT(n < sizeof(tmpFnameStr) && "Buffer overflow of file name");
|
||||||
|
|
|
@ -252,23 +252,36 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (walGetLastFileCachedSize(pWal) > WAL_RECOV_SIZE_LIMIT / 2) {
|
||||||
|
if (walSaveMeta(pWal) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
|
int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
|
||||||
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
|
|
||||||
pWal->vers.verInSnapshotting = ver;
|
pWal->vers.verInSnapshotting = ver;
|
||||||
wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64,
|
wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64,
|
||||||
pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
||||||
// check file rolling
|
// check file rolling
|
||||||
if (pWal->cfg.retentionPeriod == 0) {
|
if (pWal->cfg.retentionPeriod == 0) {
|
||||||
taosThreadMutexLock(&pWal->mutex);
|
|
||||||
if (walGetLastFileSize(pWal) != 0) {
|
if (walGetLastFileSize(pWal) != 0) {
|
||||||
walRollImpl(pWal);
|
if (walRollImpl(pWal) < 0) {
|
||||||
|
wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr());
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
|
||||||
}
|
}
|
||||||
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walEndSnapshot(SWal *pWal) {
|
int32_t walEndSnapshot(SWal *pWal) {
|
||||||
|
|
Loading…
Reference in New Issue