enh(wal): remove file failure handling
This commit is contained in:
parent
904ec81bf9
commit
0c942e622a
|
@ -77,11 +77,11 @@ typedef struct {
|
||||||
} SWalSyncInfo;
|
} SWalSyncInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t protoVer;
|
|
||||||
int64_t version;
|
int64_t version;
|
||||||
int16_t msgType;
|
int64_t ingestTs;
|
||||||
int32_t bodyLen;
|
int32_t bodyLen;
|
||||||
int64_t ingestTs; // not implemented
|
int16_t msgType;
|
||||||
|
int8_t protoVer;
|
||||||
|
|
||||||
// sync meta
|
// sync meta
|
||||||
SWalSyncInfo syncMeta;
|
SWalSyncInfo syncMeta;
|
||||||
|
|
|
@ -139,7 +139,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
const char* idxPattern = "^[0-9]+.idx$";
|
const char* idxPattern = "^[0-9]+.idx$";
|
||||||
regex_t logRegPattern;
|
regex_t logRegPattern;
|
||||||
regex_t idxRegPattern;
|
regex_t idxRegPattern;
|
||||||
SArray* pLogInfoArray = taosArrayInit(8, sizeof(SWalFileInfo));
|
SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo));
|
||||||
|
|
||||||
regcomp(&logRegPattern, logPattern, REG_EXTENDED);
|
regcomp(&logRegPattern, logPattern, REG_EXTENDED);
|
||||||
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
|
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
|
||||||
|
@ -159,7 +159,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
SWalFileInfo fileInfo;
|
SWalFileInfo fileInfo;
|
||||||
memset(&fileInfo, -1, sizeof(SWalFileInfo));
|
memset(&fileInfo, -1, sizeof(SWalFileInfo));
|
||||||
sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
|
sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
|
||||||
taosArrayPush(pLogInfoArray, &fileInfo);
|
taosArrayPush(actualLog, &fileInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,10 +167,10 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
regfree(&logRegPattern);
|
regfree(&logRegPattern);
|
||||||
regfree(&idxRegPattern);
|
regfree(&idxRegPattern);
|
||||||
|
|
||||||
taosArraySort(pLogInfoArray, compareWalFileInfo);
|
taosArraySort(actualLog, compareWalFileInfo);
|
||||||
|
|
||||||
int metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
|
int metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
int actualFileNum = taosArrayGetSize(pLogInfoArray);
|
int actualFileNum = taosArrayGetSize(actualLog);
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
for (int32_t fileNo = actualFileNum - 1; fileNo >= 0; fileNo--) {
|
for (int32_t fileNo = actualFileNum - 1; fileNo >= 0; fileNo--) {
|
||||||
|
@ -196,11 +196,11 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum);
|
taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum);
|
||||||
} else if (metaFileNum < actualFileNum) {
|
} else if (metaFileNum < actualFileNum) {
|
||||||
for (int i = metaFileNum; i < actualFileNum; i++) {
|
for (int i = metaFileNum; i < actualFileNum; i++) {
|
||||||
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i);
|
SWalFileInfo* pFileInfo = taosArrayGet(actualLog, i);
|
||||||
taosArrayPush(pWal->fileInfoSet, pFileInfo);
|
taosArrayPush(pWal->fileInfoSet, pFileInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pLogInfoArray);
|
taosArrayDestroy(actualLog);
|
||||||
|
|
||||||
pWal->writeCur = actualFileNum - 1;
|
pWal->writeCur = actualFileNum - 1;
|
||||||
if (actualFileNum > 0) {
|
if (actualFileNum > 0) {
|
||||||
|
@ -221,7 +221,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
|
|
||||||
int code = walSaveMeta(pWal);
|
int code = walSaveMeta(pWal);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
taosArrayDestroy(pLogInfoArray);
|
taosArrayDestroy(actualLog);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -423,37 +423,38 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
int32_t walReadVer(SWalReader *pReader, int64_t ver) {
|
||||||
wDebug("vgId:%d wal start to read ver %ld", pRead->pWal->cfg.vgId, ver);
|
wDebug("vgId:%d wal start to read ver %ld", pReader->pWal->cfg.vgId, ver);
|
||||||
int64_t contLen;
|
int64_t contLen;
|
||||||
|
int32_t code;
|
||||||
bool seeked = false;
|
bool seeked = false;
|
||||||
|
|
||||||
if (pRead->pWal->vers.firstVer == -1) {
|
if (pReader->pWal->vers.firstVer == -1) {
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
|
if (ver > pReader->pWal->vers.lastVer || ver < pReader->pWal->vers.firstVer) {
|
||||||
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
|
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
|
||||||
ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer);
|
ver, pReader->pWal->vers.firstVer, pReader->pWal->vers.lastVer);
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRead->curInvalid || pRead->curVersion != ver) {
|
if (pReader->curInvalid || pReader->curVersion != ver) {
|
||||||
if (walReadSeekVer(pRead, ver) < 0) {
|
if (walReadSeekVer(pReader, ver) < 0) {
|
||||||
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr());
|
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
seeked = true;
|
seeked = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
contLen = taosReadFile(pReader->pLogFile, pReader->pHead, sizeof(SWalCkHead));
|
||||||
if (contLen == sizeof(SWalCkHead)) {
|
if (contLen == sizeof(SWalCkHead)) {
|
||||||
break;
|
break;
|
||||||
} else if (contLen == 0 && !seeked) {
|
} else if (contLen == 0 && !seeked) {
|
||||||
walReadSeekVerImpl(pRead, ver);
|
walReadSeekVerImpl(pReader, ver);
|
||||||
seeked = true;
|
seeked = true;
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
|
@ -467,26 +468,26 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
contLen = walValidHeadCksum(pRead->pHead);
|
code = walValidHeadCksum(pReader->pHead);
|
||||||
if (contLen != 0) {
|
if (code != 0) {
|
||||||
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId,
|
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId,
|
||||||
ver);
|
ver);
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRead->capacity < pRead->pHead->head.bodyLen) {
|
if (pReader->capacity < pReader->pHead->head.bodyLen) {
|
||||||
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pRead->pHead->head.bodyLen);
|
void *ptr = taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen);
|
||||||
if (ptr == NULL) {
|
if (ptr == NULL) {
|
||||||
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pRead->pHead = ptr;
|
pReader->pHead = ptr;
|
||||||
pRead->capacity = pRead->pHead->head.bodyLen;
|
pReader->capacity = pReader->pHead->head.bodyLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((contLen = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
|
if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, pReader->pHead->head.bodyLen)) !=
|
||||||
pRead->pHead->head.bodyLen) {
|
pReader->pHead->head.bodyLen) {
|
||||||
if (contLen < 0)
|
if (contLen < 0)
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
else {
|
else {
|
||||||
|
@ -496,25 +497,28 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRead->pHead->head.version != ver) {
|
if (pReader->pHead->head.version != ver) {
|
||||||
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
|
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
|
||||||
pRead->pHead->head.version, ver);
|
pReader->pHead->head.version, ver);
|
||||||
pRead->curInvalid = 1;
|
pReader->curInvalid = 1;
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
contLen = walValidBodyCksum(pRead->pHead);
|
code = walValidBodyCksum(pReader->pHead);
|
||||||
if (contLen != 0) {
|
if (code != 0) {
|
||||||
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
|
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId,
|
||||||
ver);
|
ver);
|
||||||
pRead->curInvalid = 1;
|
uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, pReader->pHead->head.bodyLen);
|
||||||
|
uint32_t logCkSum = pReader->pHead->cksumBody;
|
||||||
|
wError("checksum written into log: %u, checksum calculated: %u", logCkSum, readCkSum);
|
||||||
|
pReader->curInvalid = 1;
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pRead->curVersion++;
|
pReader->curVersion++;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -289,18 +289,25 @@ int32_t walEndSnapshot(SWal *pWal) {
|
||||||
newTotSize -= iter->fileSize;
|
newTotSize -= iter->fileSize;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
char fnameStr[WAL_FILE_LEN];
|
int32_t actualDelete = 0;
|
||||||
|
char fnameStr[WAL_FILE_LEN];
|
||||||
// remove file
|
// remove file
|
||||||
for (int i = 0; i < deleteCnt; i++) {
|
for (int i = 0; i < deleteCnt; i++) {
|
||||||
pInfo = taosArrayGet(pWal->fileInfoSet, i);
|
pInfo = taosArrayGet(pWal->fileInfoSet, i);
|
||||||
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
|
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
|
||||||
taosRemoveFile(fnameStr);
|
if (taosRemoveFile(fnameStr) < 0) {
|
||||||
|
goto UPDATE_META;
|
||||||
|
}
|
||||||
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
|
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
|
||||||
taosRemoveFile(fnameStr);
|
if (taosRemoveFile(fnameStr) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
actualDelete++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
UPDATE_META:
|
||||||
// make new array, remove files
|
// make new array, remove files
|
||||||
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
|
taosArrayPopFrontBatch(pWal->fileInfoSet, actualDelete);
|
||||||
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
|
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
|
||||||
pWal->writeCur = -1;
|
pWal->writeCur = -1;
|
||||||
pWal->vers.firstVer = -1;
|
pWal->vers.firstVer = -1;
|
||||||
|
|
Loading…
Reference in New Issue