use new return macros for wal read
This commit is contained in:
parent
ac8e072275
commit
38689df0b9
|
@ -180,7 +180,6 @@ SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id);
|
||||||
void walCloseReader(SWalReader *pRead);
|
void walCloseReader(SWalReader *pRead);
|
||||||
void walReadReset(SWalReader *pReader);
|
void walReadReset(SWalReader *pReader);
|
||||||
int32_t walReadVer(SWalReader *pRead, int64_t ver);
|
int32_t walReadVer(SWalReader *pRead, int64_t ver);
|
||||||
void decryptBody(SWalCfg *cfg, SWalCkHead *pHead, int32_t plainBodyLen, const char *func);
|
|
||||||
int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver);
|
int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver);
|
||||||
int32_t walNextValidMsg(SWalReader *pRead);
|
int32_t walNextValidMsg(SWalReader *pRead);
|
||||||
int64_t walReaderGetCurrentVer(const SWalReader *pReader);
|
int64_t walReaderGetCurrentVer(const SWalReader *pReader);
|
||||||
|
|
|
@ -309,10 +309,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
code = walNextValidMsg(pReader);
|
TAOS_CHECK_RETURN(walNextValidMsg(pReader));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
SWalCont* pCont = &pReader->pHead->head;
|
SWalCont* pCont = &pReader->pHead->head;
|
||||||
int64_t ver = pCont->version;
|
int64_t ver = pCont->version;
|
||||||
|
|
|
@ -166,6 +166,8 @@ int32_t walMetaSerialize(SWal* pWal, char** serialized);
|
||||||
int32_t walMetaDeserialize(SWal* pWal, const char* bytes);
|
int32_t walMetaDeserialize(SWal* pWal, const char* bytes);
|
||||||
// meta section end
|
// meta section end
|
||||||
|
|
||||||
|
void decryptBody(SWalCfg* cfg, SWalCkHead* pHead, int32_t plainBodyLen, const char* func);
|
||||||
|
|
||||||
int64_t walGetSeq();
|
int64_t walGetSeq();
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -75,30 +75,24 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
||||||
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64,
|
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64,
|
||||||
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
|
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
|
||||||
if (fetchVer > appliedVer) {
|
if (fetchVer > appliedVer) {
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
while (fetchVer <= appliedVer) {
|
while (fetchVer <= appliedVer) {
|
||||||
if (walFetchHead(pReader, fetchVer) < 0) {
|
TAOS_CHECK_RETURN(walFetchHead(pReader, fetchVer));
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t type = pReader->pHead->head.msgType;
|
int32_t type = pReader->pHead->head.msgType;
|
||||||
if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
|
if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
|
||||||
(IS_META_MSG(type) && pReader->cond.scanMeta)) {
|
(IS_META_MSG(type) && pReader->cond.scanMeta)) {
|
||||||
int32_t code = walFetchBody(pReader);
|
TAOS_RETURN(walFetchBody(pReader));
|
||||||
return (code == TSDB_CODE_SUCCESS) ? 0 : -1;
|
|
||||||
} else {
|
} else {
|
||||||
if (walSkipFetchBody(pReader) < 0) {
|
TAOS_CHECK_RETURN(walSkipFetchBody(pReader));
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
fetchVer = pReader->curVersion;
|
fetchVer = pReader->curVersion;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; }
|
int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; }
|
||||||
|
@ -130,7 +124,7 @@ void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal *pOffset) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
|
static int32_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
|
||||||
int64_t ret = 0;
|
int64_t ret = 0;
|
||||||
|
|
||||||
TdFilePtr pIdxTFile = pReader->pIdxFile;
|
TdFilePtr pIdxTFile = pReader->pIdxFile;
|
||||||
|
@ -140,32 +134,34 @@ static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int
|
||||||
int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
|
int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
|
||||||
ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
|
ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
|
wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
|
||||||
ver, offset, terrstr());
|
ver, offset, terrstr());
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
}
|
}
|
||||||
SWalIdxEntry entry = {0};
|
SWalIdxEntry entry = {0};
|
||||||
if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
|
if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
wError("vgId:%d, failed to read idx file, since %s", pReader->pWal->cfg.vgId, terrstr());
|
wError("vgId:%d, failed to read idx file, since %s", pReader->pWal->cfg.vgId, terrstr());
|
||||||
|
|
||||||
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
wError("vgId:%d, read idx file incompletely, read bytes %" PRId64 ", bytes should be %ld",
|
wError("vgId:%d, read idx file incompletely, read bytes %" PRId64 ", bytes should be %ld",
|
||||||
pReader->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry));
|
pReader->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry));
|
||||||
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
|
||||||
}
|
}
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
|
ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
|
wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
|
||||||
ver, entry.offset, terrstr());
|
ver, entry.offset, terrstr());
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
}
|
}
|
||||||
return ret;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
|
static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
|
||||||
|
@ -177,9 +173,9 @@ static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
|
||||||
walBuildLogName(pReader->pWal, fileFirstVer, fnameStr);
|
walBuildLogName(pReader->pWal, fileFirstVer, fnameStr);
|
||||||
TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
||||||
if (pLogFile == NULL) {
|
if (pLogFile == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
|
wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
pReader->pLogFile = pLogFile;
|
pReader->pLogFile = pLogFile;
|
||||||
|
@ -187,16 +183,16 @@ static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
|
||||||
walBuildIdxName(pReader->pWal, fileFirstVer, fnameStr);
|
walBuildIdxName(pReader->pWal, fileFirstVer, fnameStr);
|
||||||
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
||||||
if (pIdxFile == NULL) {
|
if (pIdxFile == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
|
wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
pReader->pIdxFile = pIdxFile;
|
pReader->pIdxFile = pIdxFile;
|
||||||
|
|
||||||
pReader->curFileFirstVer = fileFirstVer;
|
pReader->curFileFirstVer = fileFirstVer;
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
|
static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
|
||||||
|
@ -208,47 +204,43 @@ static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
|
||||||
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||||
if (pRet == NULL) {
|
if (pRet == NULL) {
|
||||||
wError("failed to find WAL log file with ver:%" PRId64, ver);
|
wError("failed to find WAL log file with ver:%" PRId64, ver);
|
||||||
terrno = TSDB_CODE_WAL_INVALID_VER;
|
|
||||||
return -1;
|
TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->curFileFirstVer != pRet->firstVer) {
|
if (pReader->curFileFirstVer != pRet->firstVer) {
|
||||||
// error code was set inner
|
// error code was set inner
|
||||||
if (walReadChangeFile(pReader, pRet->firstVer) < 0) {
|
TAOS_CHECK_RETURN(walReadChangeFile(pReader, pRet->firstVer));
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// error code was set inner
|
// error code was set inner
|
||||||
if (walReadSeekFilePos(pReader, pRet->firstVer, ver) < 0) {
|
TAOS_CHECK_RETURN(walReadSeekFilePos(pReader, pRet->firstVer, ver));
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver);
|
wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver);
|
||||||
|
|
||||||
pReader->curVersion = ver;
|
pReader->curVersion = ver;
|
||||||
return 0;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
|
int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
|
||||||
SWal *pWal = pReader->pWal;
|
SWal *pWal = pReader->pWal;
|
||||||
if (ver == pReader->curVersion) {
|
if (ver == pReader->curVersion) {
|
||||||
wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver);
|
wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver);
|
||||||
return 0;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
|
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
|
||||||
wInfo("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
|
wInfo("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
|
||||||
ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
|
||||||
return -1;
|
TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walReadSeekVerImpl(pReader, ver) < 0) {
|
TAOS_CHECK_RETURN(walReadSeekVerImpl(pReader, ver));
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
|
int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
|
||||||
|
@ -258,14 +250,12 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
|
||||||
|
|
||||||
// TODO: valid ver
|
// TODO: valid ver
|
||||||
if (ver > pRead->pWal->vers.commitVer) {
|
if (ver > pRead->pWal->vers.commitVer) {
|
||||||
return -1;
|
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRead->curVersion != ver) {
|
if (pRead->curVersion != ver) {
|
||||||
code = walReaderSeekVer(pRead, ver);
|
TAOS_CHECK_RETURN(walReaderSeekVer(pRead, ver));
|
||||||
if (code < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
seeked = true;
|
seeked = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -274,18 +264,16 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
|
||||||
if (contLen == sizeof(SWalCkHead)) {
|
if (contLen == sizeof(SWalCkHead)) {
|
||||||
break;
|
break;
|
||||||
} else if (contLen == 0 && !seeked) {
|
} else if (contLen == 0 && !seeked) {
|
||||||
if (walReadSeekVerImpl(pRead, ver) < 0) {
|
TAOS_CHECK_RETURN(walReadSeekVerImpl(pRead, ver));
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
seeked = true;
|
seeked = true;
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
if (contLen < 0) {
|
if (contLen < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
|
||||||
}
|
}
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,11 +282,11 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, 0x%" PRIx64,
|
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, 0x%" PRIx64,
|
||||||
pRead->pWal->cfg.vgId, ver, pRead->readerId);
|
pRead->pWal->cfg.vgId, ver, pRead->readerId);
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
return -1;
|
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walSkipFetchBody(SWalReader *pRead) {
|
int32_t walSkipFetchBody(SWalReader *pRead) {
|
||||||
|
@ -315,12 +303,12 @@ int32_t walSkipFetchBody(SWalReader *pRead) {
|
||||||
}
|
}
|
||||||
int64_t code = taosLSeekFile(pRead->pLogFile, cryptedBodyLen, SEEK_CUR);
|
int64_t code = taosLSeekFile(pRead->pLogFile, cryptedBodyLen, SEEK_CUR);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pRead->curVersion++;
|
pRead->curVersion++;
|
||||||
return 0;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walFetchBody(SWalReader *pRead) {
|
int32_t walFetchBody(SWalReader *pRead) {
|
||||||
|
@ -345,8 +333,7 @@ int32_t walFetchBody(SWalReader *pRead) {
|
||||||
if (pRead->capacity < cryptedBodyLen) {
|
if (pRead->capacity < cryptedBodyLen) {
|
||||||
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + cryptedBodyLen);
|
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + cryptedBodyLen);
|
||||||
if (ptr == NULL) {
|
if (ptr == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
pRead->pHead = ptr;
|
pRead->pHead = ptr;
|
||||||
pReadHead = &pRead->pHead->head;
|
pReadHead = &pRead->pHead->head;
|
||||||
|
@ -355,23 +342,24 @@ int32_t walFetchBody(SWalReader *pRead) {
|
||||||
|
|
||||||
if (cryptedBodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, cryptedBodyLen)) {
|
if (cryptedBodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, cryptedBodyLen)) {
|
||||||
if (plainBodyLen < 0) {
|
if (plainBodyLen < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, 0x%" PRIx64, vgId,
|
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, 0x%" PRIx64, vgId,
|
||||||
pReadHead->version, ver, tstrerror(terrno), id);
|
pReadHead->version, ver, tstrerror(terrno), id);
|
||||||
|
|
||||||
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
} else {
|
} else {
|
||||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64
|
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64
|
||||||
", since file corrupted, 0x%" PRIx64,
|
", since file corrupted, 0x%" PRIx64,
|
||||||
vgId, pReadHead->version, ver, id);
|
vgId, pReadHead->version, ver, id);
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
|
||||||
}
|
}
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReadHead->version != ver) {
|
if (pReadHead->version != ver) {
|
||||||
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64 ", 0x%" PRIx64, vgId,
|
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64 ", 0x%" PRIx64, vgId,
|
||||||
pReadHead->version, ver, id);
|
pReadHead->version, ver, id);
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
return -1;
|
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
decryptBody(&pRead->pWal->cfg, pRead->pHead, plainBodyLen, __FUNCTION__);
|
decryptBody(&pRead->pWal->cfg, pRead->pHead, plainBodyLen, __FUNCTION__);
|
||||||
|
@ -379,40 +367,43 @@ int32_t walFetchBody(SWalReader *pRead) {
|
||||||
if (walValidBodyCksum(pRead->pHead) != 0) {
|
if (walValidBodyCksum(pRead->pHead) != 0) {
|
||||||
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, 0x%" PRIx64, vgId, ver,
|
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, 0x%" PRIx64, vgId, ver,
|
||||||
id);
|
id);
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
return -1;
|
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
pRead->curVersion++;
|
pRead->curVersion++;
|
||||||
return 0;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walReadVer(SWalReader *pReader, int64_t ver) {
|
int32_t walReadVer(SWalReader *pReader, int64_t ver) {
|
||||||
wDebug("vgId:%d, wal start to read index:%" PRId64, pReader->pWal->cfg.vgId, ver);
|
wDebug("vgId:%d, wal start to read index:%" PRId64, pReader->pWal->cfg.vgId, ver);
|
||||||
int64_t contLen;
|
int64_t contLen;
|
||||||
int32_t code;
|
int32_t code = 0;
|
||||||
bool seeked = false;
|
bool seeked = false;
|
||||||
|
|
||||||
if (walIsEmpty(pReader->pWal)) {
|
if (walIsEmpty(pReader->pWal)) {
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ver > pReader->pWal->vers.lastVer || ver < pReader->pWal->vers.firstVer) {
|
if (ver > pReader->pWal->vers.lastVer || ver < pReader->pWal->vers.firstVer) {
|
||||||
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
|
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
|
||||||
ver, pReader->pWal->vers.firstVer, pReader->pWal->vers.lastVer);
|
ver, pReader->pWal->vers.firstVer, pReader->pWal->vers.lastVer);
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
|
||||||
return -1;
|
TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(&pReader->mutex);
|
taosThreadMutexLock(&pReader->mutex);
|
||||||
|
|
||||||
if (pReader->curVersion != ver) {
|
if (pReader->curVersion != ver) {
|
||||||
if (walReaderSeekVer(pReader, ver) < 0) {
|
code = walReaderSeekVer(pReader, ver);
|
||||||
|
if (code) {
|
||||||
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
|
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
|
||||||
taosThreadMutexUnlock(&pReader->mutex);
|
taosThreadMutexUnlock(&pReader->mutex);
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
seeked = true;
|
seeked = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -421,22 +412,24 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
|
||||||
if (contLen == sizeof(SWalCkHead)) {
|
if (contLen == sizeof(SWalCkHead)) {
|
||||||
break;
|
break;
|
||||||
} else if (contLen == 0 && !seeked) {
|
} else if (contLen == 0 && !seeked) {
|
||||||
if (walReadSeekVerImpl(pReader, ver) < 0) {
|
code = walReadSeekVerImpl(pReader, ver);
|
||||||
|
if (code) {
|
||||||
taosThreadMutexUnlock(&pReader->mutex);
|
taosThreadMutexUnlock(&pReader->mutex);
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
seeked = true;
|
seeked = true;
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
if (contLen < 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
} else {
|
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
}
|
|
||||||
wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s",
|
wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s",
|
||||||
pReader->pWal->cfg.vgId, ver, terrstr());
|
pReader->pWal->cfg.vgId, ver, terrstr());
|
||||||
taosThreadMutexUnlock(&pReader->mutex);
|
taosThreadMutexUnlock(&pReader->mutex);
|
||||||
return -1;
|
|
||||||
|
if (contLen < 0) {
|
||||||
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
|
} else {
|
||||||
|
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -444,9 +437,9 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId,
|
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId,
|
||||||
ver);
|
ver);
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
taosThreadMutexUnlock(&pReader->mutex);
|
taosThreadMutexUnlock(&pReader->mutex);
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t plainBodyLen = pReader->pHead->head.bodyLen;
|
int32_t plainBodyLen = pReader->pHead->head.bodyLen;
|
||||||
|
@ -460,33 +453,33 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
|
||||||
if (pReader->capacity < cryptedBodyLen) {
|
if (pReader->capacity < cryptedBodyLen) {
|
||||||
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + cryptedBodyLen);
|
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + cryptedBodyLen);
|
||||||
if (ptr == NULL) {
|
if (ptr == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
taosThreadMutexUnlock(&pReader->mutex);
|
taosThreadMutexUnlock(&pReader->mutex);
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
pReader->pHead = ptr;
|
pReader->pHead = ptr;
|
||||||
pReader->capacity = cryptedBodyLen;
|
pReader->capacity = cryptedBodyLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, cryptedBodyLen)) != cryptedBodyLen) {
|
if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, cryptedBodyLen)) != cryptedBodyLen) {
|
||||||
if (contLen < 0)
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
else {
|
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
}
|
|
||||||
wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s",
|
wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s",
|
||||||
pReader->pWal->cfg.vgId, ver, terrstr());
|
pReader->pWal->cfg.vgId, ver, terrstr());
|
||||||
taosThreadMutexUnlock(&pReader->mutex);
|
taosThreadMutexUnlock(&pReader->mutex);
|
||||||
return -1;
|
|
||||||
|
if (contLen < 0) {
|
||||||
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
|
} else {
|
||||||
|
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->pHead->head.version != ver) {
|
if (pReader->pHead->head.version != ver) {
|
||||||
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
|
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
|
||||||
pReader->pHead->head.version, ver);
|
pReader->pHead->head.version, ver);
|
||||||
// pReader->curInvalid = 1;
|
// pReader->curInvalid = 1;
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
taosThreadMutexUnlock(&pReader->mutex);
|
taosThreadMutexUnlock(&pReader->mutex);
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
decryptBody(&pReader->pWal->cfg, pReader->pHead, plainBodyLen, __FUNCTION__);
|
decryptBody(&pReader->pWal->cfg, pReader->pHead, plainBodyLen, __FUNCTION__);
|
||||||
|
@ -499,15 +492,16 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
|
||||||
uint32_t logCkSum = pReader->pHead->cksumBody;
|
uint32_t logCkSum = pReader->pHead->cksumBody;
|
||||||
wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum);
|
wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum);
|
||||||
// pReader->curInvalid = 1;
|
// pReader->curInvalid = 1;
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
taosThreadMutexUnlock(&pReader->mutex);
|
taosThreadMutexUnlock(&pReader->mutex);
|
||||||
return -1;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
|
||||||
}
|
}
|
||||||
pReader->curVersion++;
|
pReader->curVersion++;
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pReader->mutex);
|
taosThreadMutexUnlock(&pReader->mutex);
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
void decryptBody(SWalCfg *cfg, SWalCkHead *pHead, int32_t plainBodyLen, const char *func) {
|
void decryptBody(SWalCfg *cfg, SWalCkHead *pHead, int32_t plainBodyLen, const char *func) {
|
||||||
|
|
Loading…
Reference in New Issue