homework-jianmu/source/libs/wal/src/walRead.c

546 lines
17 KiB
C

/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "taoserror.h"
#include "walInt.h"
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer);
static int32_t walFetchBodyNew(SWalReader *pRead);
static int32_t walSkipFetchBodyNew(SWalReader *pRead);
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader));
if (pReader == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pReader->pWal = pWal;
pReader->readerId = tGenIdPI64();
pReader->pIdxFile = NULL;
pReader->pLogFile = NULL;
pReader->curVersion = -1;
pReader->curFileFirstVer = -1;
pReader->curInvalid = 1;
pReader->capacity = 0;
if (cond) {
pReader->cond = *cond;
} else {
pReader->cond.scanUncommited = 0;
pReader->cond.scanNotApplied = 0;
pReader->cond.scanMeta = 0;
pReader->cond.enableRef = 0;
}
taosThreadMutexInit(&pReader->mutex, NULL);
pReader->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
if (pReader->pHead == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
taosMemoryFree(pReader);
return NULL;
}
/*if (pReader->cond.enableRef) {*/
/* taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/
/*}*/
return pReader;
}
void walCloseReader(SWalReader *pReader) {
taosCloseFile(&pReader->pIdxFile);
taosCloseFile(&pReader->pLogFile);
/*if (pReader->cond.enableRef) {*/
/*taosHashRemove(pReader->pWal->pRefHash, &pReader->readerId, sizeof(int64_t));*/
/*}*/
taosMemoryFreeClear(pReader->pHead);
taosMemoryFree(pReader);
}
int32_t walNextValidMsg(SWalReader *pReader) {
int64_t fetchVer = pReader->curVersion;
int64_t lastVer = walGetLastVer(pReader->pWal);
int64_t committedVer = walGetCommittedVer(pReader->pWal);
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
endVer = TMIN(appliedVer, endVer);
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
", applied index:%" PRId64 ", end index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
pReader->curStopped = 0;
while (fetchVer <= endVer) {
if (walFetchHeadNew(pReader, fetchVer) < 0) {
return -1;
}
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT ||
(IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) {
if (walFetchBodyNew(pReader) < 0) {
return -1;
}
return 0;
} else {
if (walSkipFetchBodyNew(pReader) < 0) {
return -1;
}
fetchVer++;
ASSERT(fetchVer == pReader->curVersion);
}
}
pReader->curStopped = 1;
return -1;
}
static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
int64_t ret = 0;
TdFilePtr pIdxTFile = pReader->pIdxFile;
TdFilePtr pLogTFile = pReader->pLogFile;
// seek position
int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
ver, offset, terrstr());
return -1;
}
SWalIdxEntry entry = {0};
if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, failed to read idx file, since %s", pReader->pWal->cfg.vgId, terrstr());
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
wError("vgId:%d, read idx file incompletely, read bytes %" PRId64 ", bytes should be %" PRIu64,
pReader->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry));
}
return -1;
}
ASSERT(entry.ver == ver);
ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
ver, entry.offset, terrstr());
return -1;
}
return ret;
}
static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
char fnameStr[WAL_FILE_LEN];
taosCloseFile(&pReader->pIdxFile);
taosCloseFile(&pReader->pLogFile);
walBuildLogName(pReader->pWal, fileFirstVer, fnameStr);
TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_READ);
if (pLogFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
return -1;
}
pReader->pLogFile = pLogFile;
walBuildIdxName(pReader->pWal, fileFirstVer, fnameStr);
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ);
if (pIdxFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
return -1;
}
pReader->pIdxFile = pIdxFile;
pReader->curFileFirstVer = fileFirstVer;
return 0;
}
int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
SWal *pWal = pReader->pWal;
// bsearch in fileSet
SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
if (pRet == NULL) {
wError("failed to find WAL log file with ver:%lld", ver);
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1;
}
if (pReader->curFileFirstVer != pRet->firstVer) {
// error code was set inner
if (walReadChangeFile(pReader, pRet->firstVer) < 0) {
return -1;
}
}
// error code was set inner
if (walReadSeekFilePos(pReader, pRet->firstVer, ver) < 0) {
return -1;
}
wDebug("vgId:%d, wal version reset from index:%" PRId64 "(invalid:%d) to index:%" PRId64, pReader->pWal->cfg.vgId,
pReader->curVersion, pReader->curInvalid, ver);
pReader->curVersion = ver;
return 0;
}
int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
SWal *pWal = pReader->pWal;
if (!pReader->curInvalid && ver == pReader->curVersion) {
wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver);
return 0;
}
pReader->curInvalid = 1;
pReader->curVersion = ver;
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
ver, pWal->vers.firstVer, pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
if (ver < pWal->vers.snapshotVer) {
}
if (walReadSeekVerImpl(pReader, ver) < 0) {
return -1;
}
return 0;
}
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
int64_t contLen;
bool seeked = false;
wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer);
if (pRead->curInvalid || pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) {
ASSERT(0);
pRead->curVersion = fetchVer;
pRead->curInvalid = 1;
return -1;
}
seeked = true;
}
while (1) {
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (contLen == sizeof(SWalCkHead)) {
break;
} else if (contLen == 0 && !seeked) {
walReadSeekVerImpl(pRead, fetchVer);
seeked = true;
continue;
} else {
if (contLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
ASSERT(0);
pRead->curInvalid = 1;
return -1;
}
}
pRead->curInvalid = 0;
return 0;
}
static int32_t walFetchBodyNew(SWalReader *pRead) {
SWalCont *pReadHead = &pRead->pHead->head;
int64_t ver = pReadHead->version;
wDebug("vgId:%d, wal starts to fetch body, index:%" PRId64, pRead->pWal->cfg.vgId, ver);
if (pRead->capacity < pReadHead->bodyLen) {
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
}
pRead->pHead = (SWalCkHead *)ptr;
pReadHead = &pRead->pHead->head;
pRead->capacity = pReadHead->bodyLen;
}
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
if (pReadHead->bodyLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver, tstrerror(terrno));
} else {
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted",
pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
pRead->curInvalid = 1;
ASSERT(0);
return -1;
}
if (pReadHead->version != ver) {
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pRead->pHead->head.version, ver);
pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1;
}
if (walValidBodyCksum(pRead->pHead) != 0) {
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1;
}
wDebug("vgId:%d, index:%" PRId64 " is fetched, cursor advance", pRead->pWal->cfg.vgId, ver);
pRead->curVersion = ver + 1;
return 0;
}
static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
int64_t code;
ASSERT(pRead->curVersion == pRead->pHead->head.version);
ASSERT(pRead->curInvalid == 0);
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curInvalid = 1;
ASSERT(0);
return -1;
}
pRead->curVersion++;
wDebug("vgId:%d, version advance to %" PRId64 ", skip fetch", pRead->pWal->cfg.vgId, pRead->curVersion);
return 0;
}
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int64_t code;
// TODO: valid ver
if (ver > pRead->pWal->vers.commitVer) {
return -1;
}
if (pRead->curInvalid || pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver);
if (code < 0) return -1;
}
ASSERT(taosValidFile(pRead->pLogFile) == true);
code = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
if (code != sizeof(SWalCkHead)) {
return -1;
}
code = walValidHeadCksum(pHead);
if (code != 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
return 0;
}
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
int64_t code;
// ASSERT(pRead->curVersion == pHead->head.version);
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curInvalid = 1;
return -1;
}
pRead->curVersion++;
return 0;
}
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
SWalCont *pReadHead = &((*ppHead)->head);
int64_t ver = pReadHead->version;
if (pRead->capacity < pReadHead->bodyLen) {
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
}
*ppHead = (SWalCkHead *)ptr;
pReadHead = &((*ppHead)->head);
pRead->capacity = pReadHead->bodyLen;
}
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
ASSERT(0);
return -1;
}
if (pReadHead->version != ver) {
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pRead->pHead->head.version, ver);
pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
if (walValidBodyCksum(*ppHead) != 0) {
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
ver);
pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
pRead->curVersion = ver + 1;
return 0;
}
int32_t walReadVer(SWalReader *pReader, int64_t ver) {
wDebug("vgId:%d, wal start to read index:%" PRId64, pReader->pWal->cfg.vgId, ver);
int64_t contLen;
int32_t code;
bool seeked = false;
if (pReader->pWal->vers.firstVer == -1) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
if (ver > pReader->pWal->vers.lastVer || ver < pReader->pWal->vers.firstVer) {
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
ver, pReader->pWal->vers.firstVer, pReader->pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
taosThreadMutexLock(&pReader->mutex);
if (pReader->curInvalid || pReader->curVersion != ver) {
if (walReadSeekVer(pReader, ver) < 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
taosThreadMutexUnlock(&pReader->mutex);
return -1;
}
seeked = true;
}
while (1) {
contLen = taosReadFile(pReader->pLogFile, pReader->pHead, sizeof(SWalCkHead));
if (contLen == sizeof(SWalCkHead)) {
break;
} else if (contLen == 0 && !seeked) {
walReadSeekVerImpl(pReader, ver);
seeked = true;
continue;
} else {
if (contLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
wError("vgId:%d, failed to read WAL record head from log file since %s", pReader->pWal->cfg.vgId, terrstr());
taosThreadMutexUnlock(&pReader->mutex);
return -1;
}
}
code = walValidHeadCksum(pReader->pHead);
if (code != 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId,
ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
taosThreadMutexUnlock(&pReader->mutex);
return -1;
}
if (pReader->capacity < pReader->pHead->head.bodyLen) {
void *ptr = taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
taosThreadMutexUnlock(&pReader->mutex);
return -1;
}
pReader->pHead = (SWalCkHead *)ptr;
pReader->capacity = pReader->pHead->head.bodyLen;
}
if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, pReader->pHead->head.bodyLen)) !=
pReader->pHead->head.bodyLen) {
if (contLen < 0)
terrno = TAOS_SYSTEM_ERROR(errno);
else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
wError("vgId:%d, failed to read WAL record body from log file since %s", pReader->pWal->cfg.vgId, terrstr());
taosThreadMutexUnlock(&pReader->mutex);
return -1;
}
if (pReader->pHead->head.version != ver) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
pReader->pHead->head.version, ver);
pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
taosThreadMutexUnlock(&pReader->mutex);
return -1;
}
code = walValidBodyCksum(pReader->pHead);
if (code != 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId,
ver);
uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, pReader->pHead->head.bodyLen);
uint32_t logCkSum = pReader->pHead->cksumBody;
wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum);
pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
taosThreadMutexUnlock(&pReader->mutex);
return -1;
}
pReader->curVersion++;
taosThreadMutexUnlock(&pReader->mutex);
return 0;
}