diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index ac33b1d6b7..39ec80db87 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -274,7 +274,7 @@ static void walRebuildFileInfoSet(SArray* metaLogList, SArray* actualLogList) { } } -void walAlignVersions(SWal* pWal) { +static void walAlignVersions(SWal* pWal) { if (pWal->vers.firstVer > pWal->vers.snapshotVer + 1) { wWarn("vgId:%d, firstVer:%" PRId64 " is larger than snapshotVer:%" PRId64 " + 1. align with it.", pWal->cfg.vgId, pWal->vers.firstVer, pWal->vers.snapshotVer); @@ -294,7 +294,7 @@ void walAlignVersions(SWal* pWal) { wInfo("vgId:%d, reset commitVer to %" PRId64, pWal->cfg.vgId, pWal->vers.commitVer); } -int walRepairLogFileTs(SWal* pWal, bool* updateMeta) { +static int walRepairLogFileTs(SWal* pWal, bool* updateMeta) { int32_t sz = taosArrayGetSize(pWal->fileInfoSet); int32_t fileIdx = -1; int32_t lastCloseTs = 0; @@ -324,7 +324,7 @@ int walRepairLogFileTs(SWal* pWal, bool* updateMeta) { return 0; } -bool walLogEntriesComplete(const SWal* pWal) { +static bool walLogEntriesComplete(const SWal* pWal) { int32_t sz = taosArrayGetSize(pWal->fileInfoSet); bool complete = true; int32_t fileIdx = -1; @@ -352,7 +352,7 @@ bool walLogEntriesComplete(const SWal* pWal) { return complete; } -int walTrimIdxFile(SWal* pWal, int32_t fileIdx) { +static int walTrimIdxFile(SWal* pWal, int32_t fileIdx) { SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); ASSERT(pFileInfo != NULL); char fnameStr[WAL_FILE_LEN]; @@ -481,7 +481,7 @@ int walCheckAndRepairMeta(SWal* pWal) { pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; pWal->vers.lastVer = ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer; } - (void)walAlignVersions(pWal); + walAlignVersions(pWal); // repair ts of files if (walRepairLogFileTs(pWal, &updateMeta) < 0) { @@ -500,7 +500,7 @@ int walCheckAndRepairMeta(SWal* pWal) { return 0; } -int walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) { +static int walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) { if (taosLSeekFile(pLogFile, offset, SEEK_SET) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -519,7 +519,7 @@ int walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) { return 0; } -int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { +static int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { int32_t sz = taosArrayGetSize(pWal->fileInfoSet); SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); char fnameStr[WAL_FILE_LEN]; @@ -866,7 +866,7 @@ static int walFindCurMetaVer(SWal* pWal) { return metaVer; } -void walUpdateSyncedOffset(SWal* pWal) { +static void walUpdateSyncedOffset(SWal* pWal) { SWalFileInfo* pFileInfo = walGetCurFileInfo(pWal); if (pFileInfo == NULL) return; pFileInfo->syncedOffset = pFileInfo->fileSize; @@ -892,7 +892,7 @@ int walSaveMeta(SWal* pWal) { } // update synced offset - (void)walUpdateSyncedOffset(pWal); + walUpdateSyncedOffset(pWal); // flush to a tmpfile n = walBuildTmpMetaName(pWal, tmpFnameStr); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index c164940f9b..310258a55e 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -26,7 +26,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond, int64_t id) { } pReader->pWal = pWal; - pReader->readerId = (id != 0)? id:tGenIdPI64(); + pReader->readerId = (id != 0) ? id : tGenIdPI64(); pReader->pIdxFile = NULL; pReader->pLogFile = NULL; pReader->curVersion = -1; @@ -35,7 +35,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond, int64_t id) { if (cond) { pReader->cond = *cond; } else { -// pReader->cond.scanUncommited = 0; + // pReader->cond.scanUncommited = 0; pReader->cond.scanNotApplied = 0; pReader->cond.scanMeta = 0; pReader->cond.enableRef = 0; @@ -58,7 +58,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond, int64_t id) { } void walCloseReader(SWalReader *pReader) { - if(pReader == NULL) return; + if (pReader == NULL) return; taosCloseFile(&pReader->pIdxFile); taosCloseFile(&pReader->pLogFile); @@ -88,7 +88,7 @@ int32_t walNextValidMsg(SWalReader *pReader) { if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) || (IS_META_MSG(type) && pReader->cond.scanMeta)) { int32_t code = walFetchBody(pReader); - return (code == TSDB_CODE_SUCCESS)? 0:-1; + return (code == TSDB_CODE_SUCCESS) ? 0 : -1; } else { if (walSkipFetchBody(pReader) < 0) { return -1; @@ -119,13 +119,13 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *eve *ever = pReader->cond.scanUncommited ? lastVer : committedVer; } -void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset){ +void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal *pOffset) { // if offset version is small than first version , let's seek to first version taosThreadMutexLock(&pWalReader->pWal->mutex); int64_t firstVer = walGetFirstVer((pWalReader)->pWal); taosThreadMutexUnlock(&pWalReader->pWal->mutex); - if (pOffset->version < firstVer){ + if (pOffset->version < firstVer) { pOffset->version = firstVer; } } @@ -199,7 +199,7 @@ static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) { return 0; } -int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { +static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { SWal *pWal = pReader->pWal; // bsearch in fileSet @@ -224,8 +224,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { return -1; } - wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, - pReader->curVersion, ver); + wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver); pReader->curVersion = ver; return 0; @@ -240,7 +239,7 @@ int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) { if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { wInfo("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId, - ver, pWal->vers.firstVer, pWal->vers.lastVer); + ver, pWal->vers.firstVer, pWal->vers.lastVer); terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; } @@ -252,7 +251,6 @@ int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) { return 0; } - int32_t walFetchHead(SWalReader *pRead, int64_t ver) { int64_t code; int64_t contLen; @@ -276,7 +274,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) { if (contLen == sizeof(SWalCkHead)) { break; } else if (contLen == 0 && !seeked) { - if(walReadSeekVerImpl(pRead, ver) < 0){ + if (walReadSeekVerImpl(pRead, ver) < 0) { return -1; } seeked = true; @@ -294,8 +292,8 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) { code = walValidHeadCksum(pRead->pHead); if (code != 0) { - wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, 0x%"PRIx64, pRead->pWal->cfg.vgId, ver, - pRead->readerId); + wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, 0x%" PRIx64, + pRead->pWal->cfg.vgId, ver, pRead->readerId); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } @@ -304,15 +302,15 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) { } int32_t walSkipFetchBody(SWalReader *pRead) { - wDebug("vgId:%d, skip:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64 - ", applied:%" PRId64 ", 0x%" PRIx64, + wDebug("vgId:%d, skip:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64 ", applied:%" PRId64 + ", 0x%" PRIx64, pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId); int32_t plainBodyLen = pRead->pHead->head.bodyLen; int32_t cryptedBodyLen = plainBodyLen; - //TODO: dmchen emun - if(pRead->pWal->cfg.encryptAlgorithm == 1){ + // TODO: dmchen emun + if (pRead->pWal->cfg.encryptAlgorithm == 1) { cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen); } int64_t code = taosLSeekFile(pRead->pLogFile, cryptedBodyLen, SEEK_CUR); @@ -339,8 +337,8 @@ int32_t walFetchBody(SWalReader *pRead) { int32_t plainBodyLen = pReadHead->bodyLen; int32_t cryptedBodyLen = plainBodyLen; - //TODO: dmchen emun - if(pRead->pWal->cfg.encryptAlgorithm == 1){ + // TODO: dmchen emun + if (pRead->pWal->cfg.encryptAlgorithm == 1) { cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen); } @@ -358,10 +356,11 @@ int32_t walFetchBody(SWalReader *pRead) { if (cryptedBodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, cryptedBodyLen)) { if (plainBodyLen < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, 0x%"PRIx64, - vgId, pReadHead->version, ver, tstrerror(terrno), id); + wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, 0x%" PRIx64, vgId, + pReadHead->version, ver, tstrerror(terrno), id); } else { - wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted, 0x%"PRIx64, + wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 + ", since file corrupted, 0x%" PRIx64, vgId, pReadHead->version, ver, id); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } @@ -369,7 +368,7 @@ int32_t walFetchBody(SWalReader *pRead) { } if (pReadHead->version != ver) { - wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64", 0x%"PRIx64, vgId, + wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64 ", 0x%" PRIx64, vgId, pReadHead->version, ver, id); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; @@ -378,7 +377,8 @@ int32_t walFetchBody(SWalReader *pRead) { decryptBody(&pRead->pWal->cfg, pRead->pHead, plainBodyLen, __FUNCTION__); if (walValidBodyCksum(pRead->pHead) != 0) { - wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, 0x%" PRIx64, vgId, ver, id); + wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, 0x%" PRIx64, vgId, ver, + id); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } @@ -421,7 +421,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { if (contLen == sizeof(SWalCkHead)) { break; } else if (contLen == 0 && !seeked) { - if(walReadSeekVerImpl(pReader, ver) < 0){ + if (walReadSeekVerImpl(pReader, ver) < 0) { taosThreadMutexUnlock(&pReader->mutex); return -1; } @@ -451,15 +451,14 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { int32_t plainBodyLen = pReader->pHead->head.bodyLen; int32_t cryptedBodyLen = plainBodyLen; - - //TODO: dmchen emun - if(pReader->pWal->cfg.encryptAlgorithm == 1){ + + // TODO: dmchen emun + if (pReader->pWal->cfg.encryptAlgorithm == 1) { cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen); } if (pReader->capacity < cryptedBodyLen) { - SWalCkHead *ptr = - (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + cryptedBodyLen); + SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + cryptedBodyLen); if (ptr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosThreadMutexUnlock(&pReader->mutex); @@ -469,8 +468,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { pReader->capacity = cryptedBodyLen; } - if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, cryptedBodyLen)) != - cryptedBodyLen) { + if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, cryptedBodyLen)) != cryptedBodyLen) { if (contLen < 0) terrno = TAOS_SYSTEM_ERROR(errno); else { @@ -485,7 +483,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { if (pReader->pHead->head.version != ver) { wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver); -// pReader->curInvalid = 1; + // pReader->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; taosThreadMutexUnlock(&pReader->mutex); return -1; @@ -500,7 +498,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, plainBodyLen); uint32_t logCkSum = pReader->pHead->cksumBody; wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum); -// pReader->curInvalid = 1; + // pReader->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; taosThreadMutexUnlock(&pReader->mutex); return -1; @@ -512,8 +510,8 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { return 0; } -void decryptBody(SWalCfg* cfg, SWalCkHead* pHead, int32_t plainBodyLen, const char* func) { - //TODO: dmchen emun +void decryptBody(SWalCfg *cfg, SWalCkHead *pHead, int32_t plainBodyLen, const char *func) { + // TODO: dmchen emun if (cfg->encryptAlgorithm == 1) { int32_t cryptedBodyLen = ENCRYPTED_LEN(plainBodyLen); char *newBody = taosMemoryMalloc(cryptedBodyLen); @@ -527,7 +525,7 @@ void decryptBody(SWalCfg* cfg, SWalCkHead* pHead, int32_t plainBodyLen, const ch int32_t count = CBC_Decrypt(&opts); - //wDebug("CBC_Decrypt cryptedBodyLen:%d, plainBodyLen:%d, %s", count, plainBodyLen, func); + // wDebug("CBC_Decrypt cryptedBodyLen:%d, plainBodyLen:%d, %s", count, plainBodyLen, func); memcpy(pHead->head.body, newBody, plainBodyLen);