refactor(wal)
This commit is contained in:
parent
4e965b314d
commit
0b8e9af8ec
|
@ -103,8 +103,8 @@ typedef struct SWal {
|
|||
int32_t fsyncSeq;
|
||||
// meta
|
||||
SWalVer vers;
|
||||
TdFilePtr pWriteLogTFile;
|
||||
TdFilePtr pWriteIdxTFile;
|
||||
TdFilePtr pLogFile;
|
||||
TdFilePtr pIdxFile;
|
||||
int32_t writeCur;
|
||||
SArray *fileInfoSet; // SArray<SWalFileInfo>
|
||||
// status
|
||||
|
|
|
@ -101,8 +101,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
|||
|
||||
// open meta
|
||||
walResetVer(&pWal->vers);
|
||||
pWal->pWriteLogTFile = NULL;
|
||||
pWal->pWriteIdxTFile = NULL;
|
||||
pWal->pLogFile = NULL;
|
||||
pWal->pIdxFile = NULL;
|
||||
pWal->writeCur = -1;
|
||||
pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo));
|
||||
if (pWal->fileInfoSet == NULL) {
|
||||
|
@ -179,10 +179,10 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
|
|||
|
||||
void walClose(SWal *pWal) {
|
||||
taosThreadMutexLock(&pWal->mutex);
|
||||
taosCloseFile(&pWal->pWriteLogTFile);
|
||||
pWal->pWriteLogTFile = NULL;
|
||||
taosCloseFile(&pWal->pWriteIdxTFile);
|
||||
pWal->pWriteIdxTFile = NULL;
|
||||
taosCloseFile(&pWal->pLogFile);
|
||||
pWal->pLogFile = NULL;
|
||||
taosCloseFile(&pWal->pIdxFile);
|
||||
pWal->pIdxFile = NULL;
|
||||
walSaveMeta(pWal);
|
||||
taosArrayDestroy(pWal->fileInfoSet);
|
||||
pWal->fileInfoSet = NULL;
|
||||
|
@ -223,7 +223,7 @@ static void walFsyncAll() {
|
|||
if (walNeedFsync(pWal)) {
|
||||
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq,
|
||||
atomic_load_32(&tsWal.seq));
|
||||
int32_t code = taosFsyncFile(pWal->pWriteLogTFile);
|
||||
int32_t code = taosFsyncFile(pWal->pLogFile);
|
||||
if (code != 0) {
|
||||
wError("vgId:%d, file:%" PRId64 ".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
||||
strerror(code));
|
||||
|
|
|
@ -22,8 +22,8 @@
|
|||
static int64_t walSeekWritePos(SWal* pWal, int64_t ver) {
|
||||
int64_t code = 0;
|
||||
|
||||
TdFilePtr pIdxTFile = pWal->pWriteIdxTFile;
|
||||
TdFilePtr pLogTFile = pWal->pWriteLogTFile;
|
||||
TdFilePtr pIdxTFile = pWal->pIdxFile;
|
||||
TdFilePtr pLogTFile = pWal->pLogFile;
|
||||
|
||||
// seek position
|
||||
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
|
||||
|
@ -68,8 +68,8 @@ int walInitWriteFile(SWal* pWal) {
|
|||
return -1;
|
||||
}
|
||||
// switch file
|
||||
pWal->pWriteIdxTFile = pIdxTFile;
|
||||
pWal->pWriteLogTFile = pLogTFile;
|
||||
pWal->pIdxFile = pIdxTFile;
|
||||
pWal->pLogFile = pLogTFile;
|
||||
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
||||
return 0;
|
||||
}
|
||||
|
@ -78,15 +78,15 @@ int walChangeWrite(SWal* pWal, int64_t ver) {
|
|||
int code;
|
||||
TdFilePtr pIdxTFile, pLogTFile;
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
if (pWal->pWriteLogTFile != NULL) {
|
||||
code = taosCloseFile(&pWal->pWriteLogTFile);
|
||||
if (pWal->pLogFile != NULL) {
|
||||
code = taosCloseFile(&pWal->pLogFile);
|
||||
if (code != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
if (pWal->pWriteIdxTFile != NULL) {
|
||||
code = taosCloseFile(&pWal->pWriteIdxTFile);
|
||||
if (pWal->pIdxFile != NULL) {
|
||||
code = taosCloseFile(&pWal->pIdxFile);
|
||||
if (code != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
|
@ -106,7 +106,7 @@ int walChangeWrite(SWal* pWal, int64_t ver) {
|
|||
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
if (pIdxTFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
pWal->pWriteIdxTFile = NULL;
|
||||
pWal->pIdxFile = NULL;
|
||||
return -1;
|
||||
}
|
||||
walBuildLogName(pWal, fileFirstVer, fnameStr);
|
||||
|
@ -114,12 +114,12 @@ int walChangeWrite(SWal* pWal, int64_t ver) {
|
|||
if (pLogTFile == NULL) {
|
||||
taosCloseFile(&pIdxTFile);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
pWal->pWriteLogTFile = NULL;
|
||||
pWal->pLogFile = NULL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pWal->pWriteLogTFile = pLogTFile;
|
||||
pWal->pWriteIdxTFile = pIdxTFile;
|
||||
pWal->pLogFile = pLogTFile;
|
||||
pWal->pIdxFile = pIdxTFile;
|
||||
pWal->writeCur = idx;
|
||||
return fileFirstVer;
|
||||
}
|
||||
|
|
|
@ -32,8 +32,8 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
|
|||
}
|
||||
}
|
||||
|
||||
taosCloseFile(&pWal->pWriteLogTFile);
|
||||
taosCloseFile(&pWal->pWriteIdxTFile);
|
||||
taosCloseFile(&pWal->pLogFile);
|
||||
taosCloseFile(&pWal->pIdxFile);
|
||||
|
||||
if (pWal->vers.firstVer != -1) {
|
||||
int32_t fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
|
||||
|
@ -324,34 +324,34 @@ END:
|
|||
|
||||
int32_t walRollImpl(SWal *pWal) {
|
||||
int32_t code = 0;
|
||||
if (pWal->pWriteIdxTFile != NULL) {
|
||||
code = taosCloseFile(&pWal->pWriteIdxTFile);
|
||||
if (pWal->pIdxFile != NULL) {
|
||||
code = taosCloseFile(&pWal->pIdxFile);
|
||||
if (code != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
if (pWal->pWriteLogTFile != NULL) {
|
||||
code = taosCloseFile(&pWal->pWriteLogTFile);
|
||||
if (pWal->pLogFile != NULL) {
|
||||
code = taosCloseFile(&pWal->pLogFile);
|
||||
if (code != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
TdFilePtr pIdxTFile, pLogTFile;
|
||||
TdFilePtr pIdxFile, pLogFile;
|
||||
// create new file
|
||||
int64_t newFileFirstVersion = pWal->vers.lastVer + 1;
|
||||
int64_t newFileFirstVer = pWal->vers.lastVer + 1;
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
|
||||
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
if (pIdxTFile == NULL) {
|
||||
walBuildIdxName(pWal, newFileFirstVer, fnameStr);
|
||||
pIdxFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
if (pIdxFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
walBuildLogName(pWal, newFileFirstVersion, fnameStr);
|
||||
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
if (pLogTFile == NULL) {
|
||||
walBuildLogName(pWal, newFileFirstVer, fnameStr);
|
||||
pLogFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
if (pLogFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
code = -1;
|
||||
goto END;
|
||||
|
@ -363,8 +363,8 @@ int32_t walRollImpl(SWal *pWal) {
|
|||
}
|
||||
|
||||
// switch file
|
||||
pWal->pWriteIdxTFile = pIdxTFile;
|
||||
pWal->pWriteLogTFile = pLogTFile;
|
||||
pWal->pIdxFile = pIdxFile;
|
||||
pWal->pLogFile = pLogFile;
|
||||
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
||||
ASSERT(pWal->writeCur >= 0);
|
||||
|
||||
|
@ -378,10 +378,10 @@ END:
|
|||
|
||||
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
||||
SWalIdxEntry entry = {.ver = ver, .offset = offset};
|
||||
int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_END);
|
||||
int64_t idxOffset = taosLSeekFile(pWal->pIdxFile, 0, SEEK_END);
|
||||
wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset,
|
||||
idxOffset);
|
||||
int64_t size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry));
|
||||
int64_t size = taosWriteFile(pWal->pIdxFile, &entry, sizeof(SWalIdxEntry));
|
||||
if (size != sizeof(SWalIdxEntry)) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
// TODO truncate
|
||||
|
@ -407,7 +407,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
|||
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
|
||||
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
|
||||
|
||||
if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
|
||||
if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
|
||||
// TODO ftruncate
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
||||
|
@ -416,7 +416,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
|||
goto END;
|
||||
}
|
||||
|
||||
if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) {
|
||||
if (taosWriteFile(pWal->pLogFile, (char *)body, bodyLen) != bodyLen) {
|
||||
// TODO ftruncate
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
||||
|
@ -456,14 +456,14 @@ int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const vo
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteIdxTFile == NULL || pWal->writeCur < 0) {
|
||||
if (pWal->pIdxFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) {
|
||||
if (walInitWriteFile(pWal) < 0) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(pWal->pWriteIdxTFile != NULL && pWal->pWriteLogTFile != NULL && pWal->writeCur >= 0);
|
||||
ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0);
|
||||
|
||||
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
|
@ -494,14 +494,14 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSync
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteIdxTFile == NULL || pWal->writeCur < 0) {
|
||||
if (pWal->pIdxFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) {
|
||||
if (walInitWriteFile(pWal) < 0) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(pWal->pWriteIdxTFile != NULL && pWal->pWriteLogTFile != NULL && pWal->writeCur >= 0);
|
||||
ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0);
|
||||
|
||||
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
|
@ -524,7 +524,7 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
|
|||
void walFsync(SWal *pWal, bool forceFsync) {
|
||||
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
|
||||
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
|
||||
if (taosFsyncFile(pWal->pWriteLogTFile) < 0) {
|
||||
if (taosFsyncFile(pWal->pLogFile) < 0) {
|
||||
wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
|
||||
strerror(errno));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue