add wal handle meta corrupt
This commit is contained in:
parent
3d7b737bbc
commit
47f1558e7c
|
@ -71,6 +71,7 @@ extern int32_t wDebugFlag;
|
||||||
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead))
|
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead))
|
||||||
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
||||||
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
||||||
|
#define WAL_MAGIC 0xFAFBFCFDULL
|
||||||
|
|
||||||
#define WAL_CUR_FAILED 1
|
#define WAL_CUR_FAILED 1
|
||||||
|
|
||||||
|
@ -98,6 +99,7 @@ typedef struct {
|
||||||
} SWalCfg;
|
} SWalCfg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
uint64_t magic;
|
||||||
uint32_t cksumHead;
|
uint32_t cksumHead;
|
||||||
uint32_t cksumBody;
|
uint32_t cksumBody;
|
||||||
SWalReadHead head;
|
SWalReadHead head;
|
||||||
|
|
|
@ -17,10 +17,10 @@
|
||||||
#define _TD_WAL_INT_H_
|
#define _TD_WAL_INT_H_
|
||||||
|
|
||||||
#include "compare.h"
|
#include "compare.h"
|
||||||
#include "tchecksum.h"
|
|
||||||
#include "wal.h"
|
|
||||||
|
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
#include "tchecksum.h"
|
||||||
|
#include "tcoding.h"
|
||||||
|
#include "wal.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -40,6 +40,19 @@ typedef struct WalIdxEntry {
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
} SWalIdxEntry;
|
} SWalIdxEntry;
|
||||||
|
|
||||||
|
static inline int tSerializeWalIdxEntry(void** buf, SWalIdxEntry* pIdxEntry) {
|
||||||
|
int tlen;
|
||||||
|
tlen += taosEncodeFixedI64(buf, pIdxEntry->ver);
|
||||||
|
tlen += taosEncodeFixedI64(buf, pIdxEntry->offset);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void* tDeserializeWalIdxEntry(void* buf, SWalIdxEntry* pIdxEntry) {
|
||||||
|
buf = taosDecodeFixedI64(buf, &pIdxEntry->ver);
|
||||||
|
buf = taosDecodeFixedI64(buf, &pIdxEntry->offset);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) {
|
static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) {
|
||||||
SWalFileInfo* pInfoLeft = (SWalFileInfo*)pLeft;
|
SWalFileInfo* pInfoLeft = (SWalFileInfo*)pLeft;
|
||||||
SWalFileInfo* pInfoRight = (SWalFileInfo*)pRight;
|
SWalFileInfo* pInfoRight = (SWalFileInfo*)pRight;
|
||||||
|
@ -130,12 +143,12 @@ int walMetaDeserialize(SWal* pWal, const char* bytes);
|
||||||
// meta section end
|
// meta section end
|
||||||
|
|
||||||
// seek section
|
// seek section
|
||||||
int walChangeFile(SWal* pWal, int64_t ver);
|
int walChangeWrite(SWal* pWal, int64_t ver);
|
||||||
int walChangeFileToLast(SWal* pWal);
|
int walSetWrite(SWal* pWal);
|
||||||
// seek section end
|
// seek section end
|
||||||
|
|
||||||
int64_t walGetSeq();
|
int64_t walGetSeq();
|
||||||
int walSeekVer(SWal* pWal, int64_t ver);
|
int walSeekWriteVer(SWal* pWal, int64_t ver);
|
||||||
int walRoll(SWal* pWal);
|
int walRoll(SWal* pWal);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
#include "cJSON.h"
|
#include "cJSON.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tfile.h"
|
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
#include "walInt.h"
|
#include "walInt.h"
|
||||||
|
|
||||||
|
@ -34,13 +33,74 @@ static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
|
||||||
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
|
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline int64_t walScanLogGetLastVer(SWal* pWal) {
|
||||||
|
ASSERT(pWal->fileInfoSet != NULL);
|
||||||
|
int sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
|
ASSERT(sz > 0);
|
||||||
|
for (int i = 0; i < sz; i++) {
|
||||||
|
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, i);
|
||||||
|
|
||||||
|
}
|
||||||
|
SWalFileInfo *pLastFileInfo = taosArrayGet(pWal->fileInfoSet, sz-1);
|
||||||
|
char fnameStr[WAL_FILE_LEN];
|
||||||
|
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
|
||||||
|
|
||||||
|
struct stat statbuf;
|
||||||
|
stat(fnameStr, &statbuf);
|
||||||
|
int readSize = MIN(WAL_MAX_SIZE, statbuf.st_size);
|
||||||
|
|
||||||
|
FileFd fd = taosOpenFileRead(fnameStr);
|
||||||
|
if (fd < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t magic = WAL_MAGIC;
|
||||||
|
|
||||||
|
char* buf = malloc(readSize + 5);
|
||||||
|
if (buf == NULL) {
|
||||||
|
taosCloseFile(fd);
|
||||||
|
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (readSize != taosReadFile(fd, buf, readSize)) {
|
||||||
|
free(buf);
|
||||||
|
taosCloseFile(fd);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* found = strstr(buf, (const char*)&magic);
|
||||||
|
if (found == NULL) {
|
||||||
|
ASSERT(false);
|
||||||
|
// file has to be deleted
|
||||||
|
free(buf);
|
||||||
|
taosCloseFile(fd);
|
||||||
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
char *another;
|
||||||
|
while((another = strstr(found + 1, (const char*)&magic)) != NULL) {
|
||||||
|
// read and validate
|
||||||
|
SWalHead *logContent = (SWalHead*)another;
|
||||||
|
if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) {
|
||||||
|
found = another;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosCloseFile(fd);
|
||||||
|
SWalHead *lastEntry = (SWalHead*)found;
|
||||||
|
|
||||||
|
return lastEntry->head.version;
|
||||||
|
}
|
||||||
|
|
||||||
int walCheckAndRepairMeta(SWal* pWal) {
|
int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
// load log files, get first/snapshot/last version info
|
// load log files, get first/snapshot/last version info
|
||||||
const char* logPattern = "^[0-9]+.log$";
|
const char* logPattern = "^[0-9]+.log$";
|
||||||
const char* idxPattern = "^[0-9]+.idx$";
|
const char* idxPattern = "^[0-9]+.idx$";
|
||||||
regex_t logRegPattern;
|
regex_t logRegPattern;
|
||||||
regex_t idxRegPattern;
|
regex_t idxRegPattern;
|
||||||
SArray* pLogArray = taosArrayInit(8, sizeof(int64_t));
|
SArray* pLogInfoArray = taosArrayInit(8, sizeof(SWalFileInfo));
|
||||||
|
|
||||||
regcomp(&logRegPattern, logPattern, REG_EXTENDED);
|
regcomp(&logRegPattern, logPattern, REG_EXTENDED);
|
||||||
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
|
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
|
||||||
|
@ -51,19 +111,84 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// scan log files and build new meta
|
||||||
struct dirent* ent;
|
struct dirent* ent;
|
||||||
while ((ent = readdir(dir)) != NULL) {
|
while ((ent = readdir(dir)) != NULL) {
|
||||||
char* name = basename(ent->d_name);
|
char* name = basename(ent->d_name);
|
||||||
int code = regexec(&logRegPattern, name, 0, NULL, 0);
|
int code = regexec(&logRegPattern, name, 0, NULL, 0);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
int64_t firstVer;
|
SWalFileInfo fileInfo;
|
||||||
sscanf(name, "%" PRId64 ".log", &firstVer);
|
memset(&fileInfo, -1, sizeof(SWalFileInfo));
|
||||||
taosArrayPush(pLogArray, &firstVer);
|
sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
|
||||||
|
FileFd fd = taosOpenFileRead(ent->d_name);
|
||||||
|
//get lastVer
|
||||||
|
//get size
|
||||||
|
taosArrayPush(pLogInfoArray, &fileInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// load meta
|
regfree(&logRegPattern);
|
||||||
// if not match, or meta missing
|
regfree(&idxRegPattern);
|
||||||
|
|
||||||
|
taosArraySort(pLogInfoArray, compareWalFileInfo);
|
||||||
|
int oldSz = 0;
|
||||||
|
if (pWal->fileInfoSet) {
|
||||||
|
oldSz = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
|
}
|
||||||
|
int newSz = taosArrayGetSize(pLogInfoArray);
|
||||||
|
// case 1. meta file not exist / cannot be parsed
|
||||||
|
if (pWal->fileInfoSet == NULL && newSz != 0) {
|
||||||
|
// recover fileInfo set
|
||||||
|
pWal->fileInfoSet = pLogInfoArray;
|
||||||
|
if (newSz != 0) {
|
||||||
|
// recover meta version
|
||||||
|
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pLogInfoArray, 0))->firstVer;
|
||||||
|
pWal->writeCur = newSz - 1;
|
||||||
|
}
|
||||||
|
// recover file size
|
||||||
|
} else if (oldSz < newSz) {
|
||||||
|
for (int i = oldSz; i < newSz; i++) {
|
||||||
|
SWalFileInfo *pFileInfo = taosArrayGet(pLogInfoArray, i);
|
||||||
|
taosArrayPush(pWal->fileInfoSet, pFileInfo);
|
||||||
|
}
|
||||||
|
pWal->writeCur = newSz - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pWal->fileInfoSet && taosArrayGetSize(pWal->fileInfoSet) != 0) {
|
||||||
|
pWal->vers.lastVer = walScanLogGetLastVer(pWal);
|
||||||
|
ASSERT(pWal->vers.lastVer != -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// case 2. versions in meta not match log
|
||||||
|
// or some log not included in meta
|
||||||
|
// (e.g. program killed)
|
||||||
|
//
|
||||||
|
// case 3. other corrupt cases
|
||||||
|
//
|
||||||
|
#if 0
|
||||||
|
int sz = taosArrayGetSize(pLogInfoArray);
|
||||||
|
for (int i = 0; i < sz; i++) {
|
||||||
|
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i);
|
||||||
|
if (i == 0 && pFileInfo->firstVer != walGetFirstVer(pWal)) {
|
||||||
|
//repair
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i > 0) {
|
||||||
|
SWalFileInfo* pLastFileInfo = taosArrayGet(pLogInfoArray, i-1);
|
||||||
|
if (pLastFileInfo->lastVer != pFileInfo->firstVer) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int code = walSaveMeta(pWal);
|
||||||
|
if (code < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// get last version of this file
|
||||||
|
//
|
||||||
// rebuild meta
|
// rebuild meta
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -87,6 +212,7 @@ int walRollFileInfo(SWal* pWal) {
|
||||||
// TODO: change to emplace back
|
// TODO: change to emplace back
|
||||||
SWalFileInfo* pNewInfo = malloc(sizeof(SWalFileInfo));
|
SWalFileInfo* pNewInfo = malloc(sizeof(SWalFileInfo));
|
||||||
if (pNewInfo == NULL) {
|
if (pNewInfo == NULL) {
|
||||||
|
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pNewInfo->firstVer = pWal->vers.lastVer + 1;
|
pNewInfo->firstVer = pWal->vers.lastVer + 1;
|
||||||
|
@ -94,7 +220,7 @@ int walRollFileInfo(SWal* pWal) {
|
||||||
pNewInfo->createTs = ts;
|
pNewInfo->createTs = ts;
|
||||||
pNewInfo->closeTs = -1;
|
pNewInfo->closeTs = -1;
|
||||||
pNewInfo->fileSize = 0;
|
pNewInfo->fileSize = 0;
|
||||||
taosArrayPush(pWal->fileInfoSet, pNewInfo);
|
taosArrayPush(pArray, pNewInfo);
|
||||||
free(pNewInfo);
|
free(pNewInfo);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -108,7 +234,16 @@ char* walMetaSerialize(SWal* pWal) {
|
||||||
cJSON* pFiles = cJSON_CreateArray();
|
cJSON* pFiles = cJSON_CreateArray();
|
||||||
cJSON* pField;
|
cJSON* pField;
|
||||||
if (pRoot == NULL || pMeta == NULL || pFiles == NULL) {
|
if (pRoot == NULL || pMeta == NULL || pFiles == NULL) {
|
||||||
// TODO
|
if(pRoot) {
|
||||||
|
cJSON_Delete(pRoot);
|
||||||
|
}
|
||||||
|
if(pMeta) {
|
||||||
|
cJSON_Delete(pMeta);
|
||||||
|
}
|
||||||
|
if(pFiles) {
|
||||||
|
cJSON_Delete(pFiles);
|
||||||
|
}
|
||||||
|
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
cJSON_AddItemToObject(pRoot, "meta", pMeta);
|
cJSON_AddItemToObject(pRoot, "meta", pMeta);
|
||||||
|
@ -221,18 +356,18 @@ int walSaveMeta(SWal* pWal) {
|
||||||
int metaVer = walFindCurMetaVer(pWal);
|
int metaVer = walFindCurMetaVer(pWal);
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN];
|
||||||
walBuildMetaName(pWal, metaVer + 1, fnameStr);
|
walBuildMetaName(pWal, metaVer + 1, fnameStr);
|
||||||
int metaTfd = tfOpenCreateWrite(fnameStr);
|
FileFd metaFd = taosOpenFileCreateWrite(fnameStr);
|
||||||
if (metaTfd < 0) {
|
if (metaFd < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
char* serialized = walMetaSerialize(pWal);
|
char* serialized = walMetaSerialize(pWal);
|
||||||
int len = strlen(serialized);
|
int len = strlen(serialized);
|
||||||
if (len != tfWrite(metaTfd, serialized, len)) {
|
if (len != taosWriteFile(metaFd, serialized, len)) {
|
||||||
// TODO:clean file
|
// TODO:clean file
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tfClose(metaTfd);
|
taosCloseFile(metaFd);
|
||||||
// delete old file
|
// delete old file
|
||||||
if (metaVer > -1) {
|
if (metaVer > -1) {
|
||||||
walBuildMetaName(pWal, metaVer, fnameStr);
|
walBuildMetaName(pWal, metaVer, fnameStr);
|
||||||
|
@ -247,7 +382,7 @@ int walLoadMeta(SWal* pWal) {
|
||||||
// find existing meta file
|
// find existing meta file
|
||||||
int metaVer = walFindCurMetaVer(pWal);
|
int metaVer = walFindCurMetaVer(pWal);
|
||||||
if (metaVer == -1) {
|
if (metaVer == -1) {
|
||||||
return 0;
|
return -1;
|
||||||
}
|
}
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN];
|
||||||
walBuildMetaName(pWal, metaVer, fnameStr);
|
walBuildMetaName(pWal, metaVer, fnameStr);
|
||||||
|
@ -257,23 +392,20 @@ int walLoadMeta(SWal* pWal) {
|
||||||
int size = statbuf.st_size;
|
int size = statbuf.st_size;
|
||||||
char* buf = malloc(size + 5);
|
char* buf = malloc(size + 5);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
|
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
memset(buf, 0, size + 5);
|
memset(buf, 0, size + 5);
|
||||||
int tfd = tfOpenRead(fnameStr);
|
FileFd fd = taosOpenFileRead(fnameStr);
|
||||||
if (tfRead(tfd, buf, size) != size) {
|
if (taosReadFile(fd, buf, size) != size) {
|
||||||
tfClose(tfd);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
taosCloseFile(fd);
|
||||||
free(buf);
|
free(buf);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// load into fileInfoSet
|
// load into fileInfoSet
|
||||||
int code = walMetaDeserialize(pWal, buf);
|
int code = walMetaDeserialize(pWal, buf);
|
||||||
if (code != 0) {
|
taosCloseFile(fd);
|
||||||
tfClose(tfd);
|
|
||||||
free(buf);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
tfClose(tfd);
|
|
||||||
free(buf);
|
free(buf);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
#include "walInt.h"
|
#include "walInt.h"
|
||||||
|
|
||||||
static int walSeekFilePos(SWal* pWal, int64_t ver) {
|
static int walSeekWritePos(SWal* pWal, int64_t ver) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
|
||||||
int64_t idxTfd = pWal->writeIdxTfd;
|
int64_t idxTfd = pWal->writeIdxTfd;
|
||||||
|
@ -41,7 +41,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
ASSERT(entry.ver == ver);
|
ASSERT(entry.ver == ver);
|
||||||
code = tfLseek(logTfd, entry.offset, SEEK_CUR);
|
code = tfLseek(logTfd, entry.offset, SEEK_SET);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -49,7 +49,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int walChangeFileToLast(SWal* pWal) {
|
int walSetWrite(SWal* pWal) {
|
||||||
int64_t idxTfd, logTfd;
|
int64_t idxTfd, logTfd;
|
||||||
SWalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet);
|
SWalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet);
|
||||||
ASSERT(pRet != NULL);
|
ASSERT(pRet != NULL);
|
||||||
|
@ -57,13 +57,13 @@ int walChangeFileToLast(SWal* pWal) {
|
||||||
|
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN];
|
||||||
walBuildIdxName(pWal, fileFirstVer, fnameStr);
|
walBuildIdxName(pWal, fileFirstVer, fnameStr);
|
||||||
idxTfd = tfOpenReadWrite(fnameStr);
|
idxTfd = tfOpenCreateWriteAppend(fnameStr);
|
||||||
if (idxTfd < 0) {
|
if (idxTfd < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
walBuildLogName(pWal, fileFirstVer, fnameStr);
|
walBuildLogName(pWal, fileFirstVer, fnameStr);
|
||||||
logTfd = tfOpenReadWrite(fnameStr);
|
logTfd = tfOpenCreateWriteAppend(fnameStr);
|
||||||
if (logTfd < 0) {
|
if (logTfd < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -74,46 +74,57 @@ int walChangeFileToLast(SWal* pWal) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int walChangeFile(SWal* pWal, int64_t ver) {
|
int walChangeWrite(SWal* pWal, int64_t ver) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
int64_t idxTfd, logTfd;
|
int64_t idxTfd, logTfd;
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN];
|
||||||
code = tfClose(pWal->writeLogTfd);
|
if (pWal->writeLogTfd != -1) {
|
||||||
if (code != 0) {
|
code = tfClose(pWal->writeLogTfd);
|
||||||
// TODO
|
if (code != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
code = tfClose(pWal->writeIdxTfd);
|
if (pWal->writeIdxTfd != -1) {
|
||||||
if (code != 0) {
|
code = tfClose(pWal->writeIdxTfd);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
if (code != 0) {
|
||||||
return -1;
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SWalFileInfo tmpInfo;
|
SWalFileInfo tmpInfo;
|
||||||
tmpInfo.firstVer = ver;
|
tmpInfo.firstVer = ver;
|
||||||
// bsearch in fileSet
|
// bsearch in fileSet
|
||||||
SWalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
int32_t idx = taosArraySearchIdx(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||||
ASSERT(pRet != NULL);
|
ASSERT(idx != -1);
|
||||||
int64_t fileFirstVer = pRet->firstVer;
|
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, idx);
|
||||||
// closed
|
/*ASSERT(pFileInfo != NULL);*/
|
||||||
if (taosArrayGetLast(pWal->fileInfoSet) != pRet) {
|
|
||||||
walBuildIdxName(pWal, fileFirstVer, fnameStr);
|
int64_t fileFirstVer = pFileInfo->firstVer;
|
||||||
idxTfd = tfOpenRead(fnameStr);
|
walBuildIdxName(pWal, fileFirstVer, fnameStr);
|
||||||
walBuildLogName(pWal, fileFirstVer, fnameStr);
|
idxTfd = tfOpenCreateWriteAppend(fnameStr);
|
||||||
logTfd = tfOpenRead(fnameStr);
|
if (idxTfd < 0) {
|
||||||
} else {
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
walBuildIdxName(pWal, fileFirstVer, fnameStr);
|
pWal->writeIdxTfd = -1;
|
||||||
idxTfd = tfOpenReadWrite(fnameStr);
|
return -1;
|
||||||
walBuildLogName(pWal, fileFirstVer, fnameStr);
|
}
|
||||||
logTfd = tfOpenReadWrite(fnameStr);
|
walBuildLogName(pWal, fileFirstVer, fnameStr);
|
||||||
|
logTfd = tfOpenCreateWriteAppend(fnameStr);
|
||||||
|
if (logTfd < 0) {
|
||||||
|
tfClose(idxTfd);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
pWal->writeLogTfd = -1;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pWal->writeLogTfd = logTfd;
|
pWal->writeLogTfd = logTfd;
|
||||||
pWal->writeIdxTfd = idxTfd;
|
pWal->writeIdxTfd = idxTfd;
|
||||||
|
pWal->writeCur = idx;
|
||||||
return fileFirstVer;
|
return fileFirstVer;
|
||||||
}
|
}
|
||||||
|
|
||||||
int walSeekVer(SWal* pWal, int64_t ver) {
|
int walSeekWriteVer(SWal* pWal, int64_t ver) {
|
||||||
int code;
|
int code;
|
||||||
if (ver == pWal->vers.lastVer) {
|
if (ver == pWal->vers.lastVer) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -123,14 +134,15 @@ int walSeekVer(SWal* pWal, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (ver < pWal->vers.snapshotVer) {
|
if (ver < pWal->vers.snapshotVer) {
|
||||||
|
|
||||||
}
|
}
|
||||||
if (ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
|
if (ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
|
||||||
code = walChangeFile(pWal, ver);
|
code = walChangeWrite(pWal, ver);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
code = walSeekFilePos(pWal, ver);
|
code = walSeekWritePos(pWal, ver);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,12 +46,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||||
|
|
||||||
// find correct file
|
// find correct file
|
||||||
if (ver < walGetLastFileFirstVer(pWal)) {
|
if (ver < walGetLastFileFirstVer(pWal)) {
|
||||||
// close current files
|
// change current files
|
||||||
tfClose(pWal->writeIdxTfd);
|
code = walChangeWrite(pWal, ver);
|
||||||
tfClose(pWal->writeLogTfd);
|
if (code < 0) {
|
||||||
// open old files
|
|
||||||
code = walChangeFile(pWal, ver);
|
|
||||||
if (code != 0) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,7 +163,8 @@ int32_t walEndSnapshot(SWal *pWal) {
|
||||||
}
|
}
|
||||||
// iterate files, until the searched result
|
// iterate files, until the searched result
|
||||||
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
|
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
|
||||||
if (pWal->totSize > pWal->cfg.retentionSize || iter->closeTs + pWal->cfg.retentionPeriod > ts) {
|
if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize)
|
||||||
|
|| (pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) {
|
||||||
// delete according to file size or close time
|
// delete according to file size or close time
|
||||||
deleteCnt++;
|
deleteCnt++;
|
||||||
newTotSize -= iter->fileSize;
|
newTotSize -= iter->fileSize;
|
||||||
|
@ -191,13 +189,12 @@ int32_t walEndSnapshot(SWal *pWal) {
|
||||||
pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
|
pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
|
||||||
}
|
}
|
||||||
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
||||||
;
|
|
||||||
pWal->totSize = newTotSize;
|
pWal->totSize = newTotSize;
|
||||||
pWal->vers.verInSnapshotting = -1;
|
pWal->vers.verInSnapshotting = -1;
|
||||||
|
|
||||||
// save snapshot ver, commit ver
|
// save snapshot ver, commit ver
|
||||||
int code = walSaveMeta(pWal);
|
int code = walSaveMeta(pWal);
|
||||||
if (code != 0) {
|
if (code < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -225,18 +222,17 @@ int walRoll(SWal *pWal) {
|
||||||
walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
|
walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
|
||||||
idxTfd = tfOpenCreateWriteAppend(fnameStr);
|
idxTfd = tfOpenCreateWriteAppend(fnameStr);
|
||||||
if (idxTfd < 0) {
|
if (idxTfd < 0) {
|
||||||
ASSERT(0);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
walBuildLogName(pWal, newFileFirstVersion, fnameStr);
|
walBuildLogName(pWal, newFileFirstVersion, fnameStr);
|
||||||
logTfd = tfOpenCreateWriteAppend(fnameStr);
|
logTfd = tfOpenCreateWriteAppend(fnameStr);
|
||||||
if (logTfd < 0) {
|
if (logTfd < 0) {
|
||||||
ASSERT(0);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
code = walRollFileInfo(pWal);
|
code = walRollFileInfo(pWal);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,8 +287,11 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
|
||||||
ASSERT(pWal->writeCur >= 0);
|
ASSERT(pWal->writeCur >= 0);
|
||||||
|
|
||||||
pthread_mutex_lock(&pWal->mutex);
|
pthread_mutex_lock(&pWal->mutex);
|
||||||
|
|
||||||
if (pWal->writeIdxTfd == -1 || pWal->writeLogTfd == -1) {
|
if (pWal->writeIdxTfd == -1 || pWal->writeLogTfd == -1) {
|
||||||
walChangeFileToLast(pWal);
|
walSetWrite(pWal);
|
||||||
|
tfLseek(pWal->writeLogTfd, 0, SEEK_END);
|
||||||
|
tfLseek(pWal->writeIdxTfd, 0, SEEK_END);
|
||||||
}
|
}
|
||||||
|
|
||||||
pWal->writeHead.head.version = index;
|
pWal->writeHead.head.version = index;
|
||||||
|
|
|
@ -107,6 +107,43 @@ class WalKeepEnv : public ::testing::Test {
|
||||||
const char* pathName = "/tmp/wal_test";
|
const char* pathName = "/tmp/wal_test";
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class WalRetentionEnv : public ::testing::Test {
|
||||||
|
protected:
|
||||||
|
static void SetUpTestCase() {
|
||||||
|
int code = walInit();
|
||||||
|
ASSERT(code == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void TearDownTestCase() { walCleanUp(); }
|
||||||
|
|
||||||
|
void walResetEnv() {
|
||||||
|
TearDown();
|
||||||
|
taosRemoveDir(pathName);
|
||||||
|
SetUp();
|
||||||
|
}
|
||||||
|
|
||||||
|
void SetUp() override {
|
||||||
|
SWalCfg cfg;
|
||||||
|
cfg.rollPeriod = -1,
|
||||||
|
cfg.segSize = -1,
|
||||||
|
cfg.retentionPeriod = -1,
|
||||||
|
cfg.retentionSize = 0,
|
||||||
|
cfg.rollPeriod = 0,
|
||||||
|
cfg.vgId = 0,
|
||||||
|
cfg.level = TAOS_WAL_FSYNC;
|
||||||
|
pWal = walOpen(pathName, &cfg);
|
||||||
|
ASSERT(pWal != NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
void TearDown() override {
|
||||||
|
walClose(pWal);
|
||||||
|
pWal = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SWal* pWal = NULL;
|
||||||
|
const char* pathName = "/tmp/wal_test";
|
||||||
|
};
|
||||||
|
|
||||||
TEST_F(WalCleanEnv, createNew) {
|
TEST_F(WalCleanEnv, createNew) {
|
||||||
walRollFileInfo(pWal);
|
walRollFileInfo(pWal);
|
||||||
ASSERT(pWal->fileInfoSet != NULL);
|
ASSERT(pWal->fileInfoSet != NULL);
|
||||||
|
@ -283,3 +320,61 @@ TEST_F(WalKeepEnv, readHandleRead) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(WalRetentionEnv, repairMeta1) {
|
||||||
|
walResetEnv();
|
||||||
|
int code;
|
||||||
|
|
||||||
|
int i;
|
||||||
|
for (i = 0; i < 100; i++) {
|
||||||
|
char newStr[100];
|
||||||
|
sprintf(newStr, "%s-%d", ranStr, i);
|
||||||
|
int len = strlen(newStr);
|
||||||
|
code = walWrite(pWal, i, 0, newStr, len);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
TearDown();
|
||||||
|
|
||||||
|
//getchar();
|
||||||
|
char buf[100];
|
||||||
|
sprintf(buf, "%s/meta-ver%d", pathName, 0);
|
||||||
|
remove(buf);
|
||||||
|
SetUp();
|
||||||
|
|
||||||
|
ASSERT_EQ(pWal->vers.lastVer, 99);
|
||||||
|
|
||||||
|
SWalReadHandle* pRead = walOpenReadHandle(pWal);
|
||||||
|
ASSERT(pRead != NULL);
|
||||||
|
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
int ver = rand() % 100;
|
||||||
|
code = walReadWithHandle(pRead, ver);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
// printf("rrbody: \n");
|
||||||
|
// for(int i = 0; i < pRead->pHead->head.len; i++) {
|
||||||
|
// printf("%d ", pRead->pHead->head.body[i]);
|
||||||
|
//}
|
||||||
|
// printf("\n");
|
||||||
|
|
||||||
|
ASSERT_EQ(pRead->pHead->head.version, ver);
|
||||||
|
ASSERT_EQ(pRead->curVersion, ver + 1);
|
||||||
|
char newStr[100];
|
||||||
|
sprintf(newStr, "%s-%d", ranStr, ver);
|
||||||
|
int len = strlen(newStr);
|
||||||
|
ASSERT_EQ(pRead->pHead->head.len, len);
|
||||||
|
for (int j = 0; j < len; j++) {
|
||||||
|
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (i = 100; i < 200; i++) {
|
||||||
|
char newStr[100];
|
||||||
|
sprintf(newStr, "%s-%d", ranStr, i);
|
||||||
|
int len = strlen(newStr);
|
||||||
|
code = walWrite(pWal, i, 0, newStr, len);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -342,7 +342,7 @@ void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t compa
|
||||||
|
|
||||||
int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t comparFn, int flags) {
|
int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t comparFn, int flags) {
|
||||||
void* item = taosArraySearch(pArray, key, comparFn, flags);
|
void* item = taosArraySearch(pArray, key, comparFn, flags);
|
||||||
return (int32_t)((char*)item - (char*)pArray->pData) / pArray->elemSize;
|
return item == NULL ? -1 : (int32_t)((char*)item - (char*)pArray->pData) / pArray->elemSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosArraySortString(SArray* pArray, __compar_fn_t comparFn) {
|
void taosArraySortString(SArray* pArray, __compar_fn_t comparFn) {
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
#include <random>
|
#include <random>
|
||||||
|
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
|
#include "tcompare.h"
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
|
@ -48,3 +49,34 @@ static void remove_batch_test() {
|
||||||
TEST(arrayTest, array_list_test) {
|
TEST(arrayTest, array_list_test) {
|
||||||
remove_batch_test();
|
remove_batch_test();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(arrayTest, array_search_test) {
|
||||||
|
SArray *pa = (SArray*) taosArrayInit(4, sizeof(int32_t));
|
||||||
|
|
||||||
|
for(int32_t i = 10; i < 20; ++i) {
|
||||||
|
int32_t a = i;
|
||||||
|
taosArrayPush(pa, &a);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int i = 0; i < 30; i++) {
|
||||||
|
int32_t k = i;
|
||||||
|
int32_t* pRet = (int32_t*)taosArraySearch(pa, &k, compareInt32Val, TD_GE);
|
||||||
|
int32_t idx = taosArraySearchIdx(pa, &k, compareInt32Val, TD_GE);
|
||||||
|
|
||||||
|
if(pRet == NULL) {
|
||||||
|
ASSERT_EQ(idx, -1);
|
||||||
|
} else {
|
||||||
|
ASSERT_EQ(taosArrayGet(pa, idx), pRet);
|
||||||
|
}
|
||||||
|
|
||||||
|
pRet = (int32_t*)taosArraySearch(pa, &k, compareInt32Val, TD_LE);
|
||||||
|
idx = taosArraySearchIdx(pa, &k, compareInt32Val, TD_LE);
|
||||||
|
|
||||||
|
if(pRet == NULL) {
|
||||||
|
ASSERT_EQ(idx, -1);
|
||||||
|
} else {
|
||||||
|
ASSERT_EQ(taosArrayGet(pa, idx), pRet);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue