refactor wal's meta & reader

This commit is contained in:
Minglei Jin 2024-07-18 09:51:31 +08:00
parent 5e4b7b35a5
commit d2be5e0464
2 changed files with 45 additions and 47 deletions

View File

@ -274,7 +274,7 @@ static void walRebuildFileInfoSet(SArray* metaLogList, SArray* actualLogList) {
}
}
void walAlignVersions(SWal* pWal) {
static void walAlignVersions(SWal* pWal) {
if (pWal->vers.firstVer > pWal->vers.snapshotVer + 1) {
wWarn("vgId:%d, firstVer:%" PRId64 " is larger than snapshotVer:%" PRId64 " + 1. align with it.", pWal->cfg.vgId,
pWal->vers.firstVer, pWal->vers.snapshotVer);
@ -294,7 +294,7 @@ void walAlignVersions(SWal* pWal) {
wInfo("vgId:%d, reset commitVer to %" PRId64, pWal->cfg.vgId, pWal->vers.commitVer);
}
int walRepairLogFileTs(SWal* pWal, bool* updateMeta) {
static int walRepairLogFileTs(SWal* pWal, bool* updateMeta) {
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
int32_t fileIdx = -1;
int32_t lastCloseTs = 0;
@ -324,7 +324,7 @@ int walRepairLogFileTs(SWal* pWal, bool* updateMeta) {
return 0;
}
bool walLogEntriesComplete(const SWal* pWal) {
static bool walLogEntriesComplete(const SWal* pWal) {
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
bool complete = true;
int32_t fileIdx = -1;
@ -352,7 +352,7 @@ bool walLogEntriesComplete(const SWal* pWal) {
return complete;
}
int walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
static int walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
ASSERT(pFileInfo != NULL);
char fnameStr[WAL_FILE_LEN];
@ -481,7 +481,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
pWal->vers.lastVer = ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer;
}
(void)walAlignVersions(pWal);
walAlignVersions(pWal);
// repair ts of files
if (walRepairLogFileTs(pWal, &updateMeta) < 0) {
@ -500,7 +500,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
return 0;
}
int walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) {
static int walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) {
if (taosLSeekFile(pLogFile, offset, SEEK_SET) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
@ -519,7 +519,7 @@ int walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) {
return 0;
}
int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
static int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
char fnameStr[WAL_FILE_LEN];
@ -866,7 +866,7 @@ static int walFindCurMetaVer(SWal* pWal) {
return metaVer;
}
void walUpdateSyncedOffset(SWal* pWal) {
static void walUpdateSyncedOffset(SWal* pWal) {
SWalFileInfo* pFileInfo = walGetCurFileInfo(pWal);
if (pFileInfo == NULL) return;
pFileInfo->syncedOffset = pFileInfo->fileSize;
@ -892,7 +892,7 @@ int walSaveMeta(SWal* pWal) {
}
// update synced offset
(void)walUpdateSyncedOffset(pWal);
walUpdateSyncedOffset(pWal);
// flush to a tmpfile
n = walBuildTmpMetaName(pWal, tmpFnameStr);

View File

@ -26,7 +26,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond, int64_t id) {
}
pReader->pWal = pWal;
pReader->readerId = (id != 0)? id:tGenIdPI64();
pReader->readerId = (id != 0) ? id : tGenIdPI64();
pReader->pIdxFile = NULL;
pReader->pLogFile = NULL;
pReader->curVersion = -1;
@ -35,7 +35,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond, int64_t id) {
if (cond) {
pReader->cond = *cond;
} else {
// pReader->cond.scanUncommited = 0;
// pReader->cond.scanUncommited = 0;
pReader->cond.scanNotApplied = 0;
pReader->cond.scanMeta = 0;
pReader->cond.enableRef = 0;
@ -58,7 +58,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond, int64_t id) {
}
void walCloseReader(SWalReader *pReader) {
if(pReader == NULL) return;
if (pReader == NULL) return;
taosCloseFile(&pReader->pIdxFile);
taosCloseFile(&pReader->pLogFile);
@ -88,7 +88,7 @@ int32_t walNextValidMsg(SWalReader *pReader) {
if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
(IS_META_MSG(type) && pReader->cond.scanMeta)) {
int32_t code = walFetchBody(pReader);
return (code == TSDB_CODE_SUCCESS)? 0:-1;
return (code == TSDB_CODE_SUCCESS) ? 0 : -1;
} else {
if (walSkipFetchBody(pReader) < 0) {
return -1;
@ -119,13 +119,13 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *eve
*ever = pReader->cond.scanUncommited ? lastVer : committedVer;
}
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset){
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal *pOffset) {
// if offset version is small than first version , let's seek to first version
taosThreadMutexLock(&pWalReader->pWal->mutex);
int64_t firstVer = walGetFirstVer((pWalReader)->pWal);
taosThreadMutexUnlock(&pWalReader->pWal->mutex);
if (pOffset->version < firstVer){
if (pOffset->version < firstVer) {
pOffset->version = firstVer;
}
}
@ -199,7 +199,7 @@ static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
return 0;
}
int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
SWal *pWal = pReader->pWal;
// bsearch in fileSet
@ -224,8 +224,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
return -1;
}
wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId,
pReader->curVersion, ver);
wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver);
pReader->curVersion = ver;
return 0;
@ -240,7 +239,7 @@ int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
wInfo("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
ver, pWal->vers.firstVer, pWal->vers.lastVer);
ver, pWal->vers.firstVer, pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
@ -252,7 +251,6 @@ int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
return 0;
}
int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
int64_t code;
int64_t contLen;
@ -276,7 +274,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
if (contLen == sizeof(SWalCkHead)) {
break;
} else if (contLen == 0 && !seeked) {
if(walReadSeekVerImpl(pRead, ver) < 0){
if (walReadSeekVerImpl(pRead, ver) < 0) {
return -1;
}
seeked = true;
@ -294,8 +292,8 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
code = walValidHeadCksum(pRead->pHead);
if (code != 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, 0x%"PRIx64, pRead->pWal->cfg.vgId, ver,
pRead->readerId);
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, 0x%" PRIx64,
pRead->pWal->cfg.vgId, ver, pRead->readerId);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
@ -304,15 +302,15 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
}
int32_t walSkipFetchBody(SWalReader *pRead) {
wDebug("vgId:%d, skip:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64
", applied:%" PRId64 ", 0x%" PRIx64,
wDebug("vgId:%d, skip:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64 ", applied:%" PRId64
", 0x%" PRIx64,
pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId);
int32_t plainBodyLen = pRead->pHead->head.bodyLen;
int32_t cryptedBodyLen = plainBodyLen;
//TODO: dmchen emun
if(pRead->pWal->cfg.encryptAlgorithm == 1){
// TODO: dmchen emun
if (pRead->pWal->cfg.encryptAlgorithm == 1) {
cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
}
int64_t code = taosLSeekFile(pRead->pLogFile, cryptedBodyLen, SEEK_CUR);
@ -339,8 +337,8 @@ int32_t walFetchBody(SWalReader *pRead) {
int32_t plainBodyLen = pReadHead->bodyLen;
int32_t cryptedBodyLen = plainBodyLen;
//TODO: dmchen emun
if(pRead->pWal->cfg.encryptAlgorithm == 1){
// TODO: dmchen emun
if (pRead->pWal->cfg.encryptAlgorithm == 1) {
cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
}
@ -358,10 +356,11 @@ int32_t walFetchBody(SWalReader *pRead) {
if (cryptedBodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, cryptedBodyLen)) {
if (plainBodyLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, 0x%"PRIx64,
vgId, pReadHead->version, ver, tstrerror(terrno), id);
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, 0x%" PRIx64, vgId,
pReadHead->version, ver, tstrerror(terrno), id);
} else {
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted, 0x%"PRIx64,
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64
", since file corrupted, 0x%" PRIx64,
vgId, pReadHead->version, ver, id);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
@ -369,7 +368,7 @@ int32_t walFetchBody(SWalReader *pRead) {
}
if (pReadHead->version != ver) {
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64", 0x%"PRIx64, vgId,
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64 ", 0x%" PRIx64, vgId,
pReadHead->version, ver, id);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
@ -378,7 +377,8 @@ int32_t walFetchBody(SWalReader *pRead) {
decryptBody(&pRead->pWal->cfg, pRead->pHead, plainBodyLen, __FUNCTION__);
if (walValidBodyCksum(pRead->pHead) != 0) {
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, 0x%" PRIx64, vgId, ver, id);
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, 0x%" PRIx64, vgId, ver,
id);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
@ -421,7 +421,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
if (contLen == sizeof(SWalCkHead)) {
break;
} else if (contLen == 0 && !seeked) {
if(walReadSeekVerImpl(pReader, ver) < 0){
if (walReadSeekVerImpl(pReader, ver) < 0) {
taosThreadMutexUnlock(&pReader->mutex);
return -1;
}
@ -451,15 +451,14 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
int32_t plainBodyLen = pReader->pHead->head.bodyLen;
int32_t cryptedBodyLen = plainBodyLen;
//TODO: dmchen emun
if(pReader->pWal->cfg.encryptAlgorithm == 1){
// TODO: dmchen emun
if (pReader->pWal->cfg.encryptAlgorithm == 1) {
cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
}
if (pReader->capacity < cryptedBodyLen) {
SWalCkHead *ptr =
(SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + cryptedBodyLen);
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + cryptedBodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosThreadMutexUnlock(&pReader->mutex);
@ -469,8 +468,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
pReader->capacity = cryptedBodyLen;
}
if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, cryptedBodyLen)) !=
cryptedBodyLen) {
if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, cryptedBodyLen)) != cryptedBodyLen) {
if (contLen < 0)
terrno = TAOS_SYSTEM_ERROR(errno);
else {
@ -485,7 +483,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
if (pReader->pHead->head.version != ver) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
pReader->pHead->head.version, ver);
// pReader->curInvalid = 1;
// pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
taosThreadMutexUnlock(&pReader->mutex);
return -1;
@ -500,7 +498,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, plainBodyLen);
uint32_t logCkSum = pReader->pHead->cksumBody;
wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum);
// pReader->curInvalid = 1;
// pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
taosThreadMutexUnlock(&pReader->mutex);
return -1;
@ -512,8 +510,8 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
return 0;
}
void decryptBody(SWalCfg* cfg, SWalCkHead* pHead, int32_t plainBodyLen, const char* func) {
//TODO: dmchen emun
void decryptBody(SWalCfg *cfg, SWalCkHead *pHead, int32_t plainBodyLen, const char *func) {
// TODO: dmchen emun
if (cfg->encryptAlgorithm == 1) {
int32_t cryptedBodyLen = ENCRYPTED_LEN(plainBodyLen);
char *newBody = taosMemoryMalloc(cryptedBodyLen);
@ -527,7 +525,7 @@ void decryptBody(SWalCfg* cfg, SWalCkHead* pHead, int32_t plainBodyLen, const ch
int32_t count = CBC_Decrypt(&opts);
//wDebug("CBC_Decrypt cryptedBodyLen:%d, plainBodyLen:%d, %s", count, plainBodyLen, func);
// wDebug("CBC_Decrypt cryptedBodyLen:%d, plainBodyLen:%d, %s", count, plainBodyLen, func);
memcpy(pHead->head.body, newBody, plainBodyLen);