add test for wal snapshot
This commit is contained in:
parent
f45083c6aa
commit
9e4b890727
|
@ -38,6 +38,24 @@ typedef enum {
|
|||
TAOS_WAL_FSYNC = 2
|
||||
} EWalType;
|
||||
|
||||
typedef struct SWalReadHead {
|
||||
int8_t sver;
|
||||
uint8_t msgType;
|
||||
int8_t reserved[2];
|
||||
int32_t len;
|
||||
int64_t version;
|
||||
char cont[];
|
||||
} SWalReadHead;
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int32_t fsyncPeriod; // millisecond
|
||||
int32_t retentionPeriod; // secs
|
||||
int32_t rollPeriod; // secs
|
||||
int64_t segSize;
|
||||
EWalType walLevel; // wal level
|
||||
} SWalCfg;
|
||||
|
||||
typedef struct {
|
||||
//union {
|
||||
//uint32_t info;
|
||||
|
@ -47,25 +65,11 @@ typedef struct {
|
|||
//uint32_t reserved : 24;
|
||||
//};
|
||||
//};
|
||||
int8_t sver;
|
||||
uint8_t msgType;
|
||||
int8_t reserved[2];
|
||||
int32_t len;
|
||||
int64_t version;
|
||||
uint32_t signature;
|
||||
uint32_t cksumHead;
|
||||
uint32_t cksumBody;
|
||||
char cont[];
|
||||
SWalReadHead head;
|
||||
} SWalHead;
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int32_t fsyncPeriod; // millisecond
|
||||
int32_t rollPeriod;
|
||||
int64_t segSize;
|
||||
EWalType walLevel; // wal level
|
||||
} SWalCfg;
|
||||
|
||||
#define WAL_PREFIX "wal"
|
||||
#define WAL_PREFIX_LEN 3
|
||||
#define WAL_NOSUFFIX_LEN 20
|
||||
|
@ -80,7 +84,7 @@ typedef struct {
|
|||
//#define WAL_FILE_NUM 1 // 3
|
||||
#define WAL_FILESET_MAX 128
|
||||
|
||||
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
|
||||
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
|
||||
#define WAL_CUR_POS_WRITABLE 1
|
||||
#define WAL_CUR_FILE_WRITABLE 2
|
||||
#define WAL_CUR_FAILED 4
|
||||
|
@ -103,21 +107,17 @@ typedef struct SWal {
|
|||
//write tfd
|
||||
int64_t writeLogTfd;
|
||||
int64_t writeIdxTfd;
|
||||
//read tfd
|
||||
int64_t readLogTfd;
|
||||
int64_t readIdxTfd;
|
||||
//current version
|
||||
int64_t curVersion;
|
||||
//wal lifecycle
|
||||
int64_t firstVersion;
|
||||
int64_t snapshotVersion;
|
||||
int64_t commitVersion;
|
||||
int64_t lastVersion;
|
||||
//snapshotting version
|
||||
int64_t snapshottingVer;
|
||||
//roll status
|
||||
int64_t lastRollSeq;
|
||||
//file set
|
||||
int32_t writeCur;
|
||||
int32_t readCur;
|
||||
SArray* fileInfoSet;
|
||||
//ctl
|
||||
int32_t curStatus;
|
||||
|
@ -148,7 +148,8 @@ int32_t walCommit(SWal *, int64_t ver);
|
|||
// truncate after
|
||||
int32_t walRollback(SWal *, int64_t ver);
|
||||
// notify that previous logs can be pruned safely
|
||||
int32_t walTakeSnapshot(SWal *, int64_t ver);
|
||||
int32_t walBeginTakeSnapshot(SWal *, int64_t ver);
|
||||
int32_t walEndTakeSnapshot(SWal *);
|
||||
//int32_t walDataCorrupted(SWal*);
|
||||
|
||||
// read
|
||||
|
|
|
@ -153,6 +153,13 @@ void taosArraySet(SArray* pArray, size_t index, void* pData);
|
|||
*/
|
||||
void taosArrayPopFrontBatch(SArray* pArray, size_t cnt);
|
||||
|
||||
/**
|
||||
* remove some data entry from front
|
||||
* @param pArray
|
||||
* @param cnt
|
||||
*/
|
||||
void taosArrayPopTailBatch(SArray* pArray, size_t cnt);
|
||||
|
||||
/**
|
||||
* remove data entry of the given index
|
||||
* @param pArray
|
||||
|
@ -213,6 +220,14 @@ void taosArraySortString(SArray* pArray, __compar_fn_t comparFn);
|
|||
*/
|
||||
void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t comparFn, int flags);
|
||||
|
||||
/**
|
||||
* search the array, return index of the element
|
||||
* @param pArray
|
||||
* @param compar
|
||||
* @param key
|
||||
*/
|
||||
int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t comparFn, int flags);
|
||||
|
||||
/**
|
||||
* search the array
|
||||
* @param pArray
|
||||
|
|
|
@ -39,7 +39,7 @@ static FORCE_INLINE int taosCalcChecksumAppend(TSCKSUM csi, uint8_t *stream, uin
|
|||
}
|
||||
|
||||
static FORCE_INLINE int taosCheckChecksum(const uint8_t *stream, uint32_t ssize, TSCKSUM checksum) {
|
||||
return (checksum == (*crc32c)(0, stream, (size_t)ssize));
|
||||
return (checksum != (*crc32c)(0, stream, (size_t)ssize));
|
||||
}
|
||||
|
||||
static FORCE_INLINE int taosCheckChecksumWhole(const uint8_t *stream, uint32_t ssize) {
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
#include "wal.h"
|
||||
#include "compare.h"
|
||||
#include "tchecksum.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -32,6 +33,11 @@ typedef struct WalFileInfo {
|
|||
int64_t fileSize;
|
||||
} WalFileInfo;
|
||||
|
||||
typedef struct WalIdxEntry {
|
||||
int64_t ver;
|
||||
int64_t offset;
|
||||
} WalIdxEntry;
|
||||
|
||||
static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) {
|
||||
WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft;
|
||||
WalFileInfo* pInfoRight = (WalFileInfo*)pRight;
|
||||
|
@ -79,6 +85,26 @@ static inline int walBuildIdxName(SWal*pWal, int64_t fileFirstVer, char* buf) {
|
|||
return sprintf(buf, "%s/%" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer);
|
||||
}
|
||||
|
||||
static inline int walValidHeadCksum(SWalHead* pHead) {
|
||||
return taosCheckChecksum((uint8_t*)&pHead->head, sizeof(SWalReadHead), pHead->cksumHead);
|
||||
}
|
||||
|
||||
static inline int walValidBodyCksum(SWalHead* pHead) {
|
||||
return taosCheckChecksum((uint8_t*)pHead->head.cont, pHead->head.len, pHead->cksumBody);
|
||||
}
|
||||
|
||||
static inline int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) {
|
||||
return walValidHeadCksum(pHead) && walValidBodyCksum(pHead);
|
||||
}
|
||||
|
||||
static inline uint32_t walCalcHeadCksum(SWalHead *pHead) {
|
||||
return taosCalcChecksum(0, (uint8_t*)&pHead->head, sizeof(SWalReadHead));
|
||||
}
|
||||
|
||||
static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) {
|
||||
return taosCalcChecksum(0, (uint8_t*)body, len);
|
||||
}
|
||||
|
||||
int walReadMeta(SWal* pWal);
|
||||
int walWriteMeta(SWal* pWal);
|
||||
int walRollFileInfo(SWal* pWal);
|
||||
|
@ -87,6 +113,10 @@ char* walMetaSerialize(SWal* pWal);
|
|||
int walMetaDeserialize(SWal* pWal, const char* bytes);
|
||||
//meta section end
|
||||
|
||||
//seek section
|
||||
int walChangeFile(SWal *pWal, int64_t ver);
|
||||
//seek section end
|
||||
|
||||
int64_t walGetSeq();
|
||||
int walSeekVer(SWal *pWal, int64_t ver);
|
||||
int walRoll(SWal *pWal);
|
||||
|
|
|
@ -24,6 +24,18 @@
|
|||
#include <libgen.h>
|
||||
#include <regex.h>
|
||||
|
||||
int64_t walGetFirstVer(SWal *pWal) {
|
||||
return pWal->firstVersion;
|
||||
}
|
||||
|
||||
int64_t walGetSnaphostVer(SWal *pWal) {
|
||||
return pWal->snapshotVersion;
|
||||
}
|
||||
|
||||
int64_t walGetLastVer(SWal *pWal) {
|
||||
return pWal->lastVersion;
|
||||
}
|
||||
|
||||
int walRollFileInfo(SWal* pWal) {
|
||||
int64_t ts = taosGetTimestampSec();
|
||||
|
||||
|
|
|
@ -82,6 +82,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
|||
}
|
||||
pWal->writeLogTfd = -1;
|
||||
pWal->writeIdxTfd = -1;
|
||||
pWal->writeCur = -1;
|
||||
|
||||
//set config
|
||||
pWal->vgId = pCfg->vgId;
|
||||
|
@ -90,13 +91,20 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
|||
pWal->segSize = pCfg->segSize;
|
||||
pWal->level = pCfg->walLevel;
|
||||
|
||||
//init status
|
||||
//init version info
|
||||
pWal->firstVersion = -1;
|
||||
pWal->commitVersion = -1;
|
||||
pWal->snapshotVersion = -1;
|
||||
pWal->lastVersion = -1;
|
||||
|
||||
pWal->snapshottingVer = -1;
|
||||
|
||||
//init status
|
||||
pWal->lastRollSeq = -1;
|
||||
|
||||
//init write buffer
|
||||
memset(&pWal->head, 0, sizeof(SWalHead));
|
||||
pWal->head.sver = 0;
|
||||
pWal->head.head.sver = 0;
|
||||
|
||||
tstrncpy(pWal->path, path, sizeof(pWal->path));
|
||||
pthread_mutex_init(&pWal->mutex, NULL);
|
||||
|
|
|
@ -15,19 +15,6 @@
|
|||
|
||||
#include "walInt.h"
|
||||
#include "tfile.h"
|
||||
#include "tchecksum.h"
|
||||
|
||||
static inline int walValidHeadCksum(SWalHead* pHead) {
|
||||
return taosCheckChecksum((uint8_t*)pHead, sizeof(SWalHead) - sizeof(uint32_t)*2, pHead->cksumHead);
|
||||
}
|
||||
|
||||
static inline int walValidBodyCksum(SWalHead* pHead) {
|
||||
return taosCheckChecksum((uint8_t*)pHead->cont, pHead->len, pHead->cksumBody);
|
||||
}
|
||||
|
||||
static int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) {
|
||||
return walValidHeadCksum(pHead) && walValidBodyCksum(pHead);
|
||||
}
|
||||
|
||||
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
|
||||
int code;
|
||||
|
@ -49,13 +36,13 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
|
|||
if(walValidHeadCksum(*ppHead) != 0) {
|
||||
return -1;
|
||||
}
|
||||
void* ptr = realloc(*ppHead, sizeof(SWalHead) + (*ppHead)->len);
|
||||
void* ptr = realloc(*ppHead, sizeof(SWalHead) + (*ppHead)->head.len);
|
||||
if(ptr == NULL) {
|
||||
free(*ppHead);
|
||||
*ppHead = NULL;
|
||||
return -1;
|
||||
}
|
||||
if(tfRead(pWal->writeLogTfd, (*ppHead)->cont, (*ppHead)->len) != (*ppHead)->len) {
|
||||
if(tfRead(pWal->writeLogTfd, (*ppHead)->head.cont, (*ppHead)->head.len) != (*ppHead)->head.len) {
|
||||
return -1;
|
||||
}
|
||||
//TODO: endian compatibility processing after read
|
||||
|
@ -69,18 +56,3 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
|
|||
int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int64_t walGetFirstVer(SWal *pWal) {
|
||||
if (pWal == NULL) return 0;
|
||||
return pWal->firstVersion;
|
||||
}
|
||||
|
||||
int64_t walGetSnaphostVer(SWal *pWal) {
|
||||
if (pWal == NULL) return 0;
|
||||
return pWal->snapshotVersion;
|
||||
}
|
||||
|
||||
int64_t walGetLastVer(SWal *pWal) {
|
||||
if (pWal == NULL) return 0;
|
||||
return pWal->lastVersion;
|
||||
}
|
||||
|
|
|
@ -43,12 +43,35 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
|
|||
if (code != 0) {
|
||||
return -1;
|
||||
}
|
||||
/*pWal->curLogOffset = readBuf[1];*/
|
||||
pWal->curVersion = ver;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int walChangeFile(SWal *pWal, int64_t ver) {
|
||||
int walChangeFileToLast(SWal *pWal) {
|
||||
int64_t idxTfd, logTfd;
|
||||
WalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet);
|
||||
ASSERT(pRet != NULL);
|
||||
int64_t fileFirstVer = pRet->firstVer;
|
||||
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
walBuildIdxName(pWal, fileFirstVer, fnameStr);
|
||||
idxTfd = tfOpenReadWrite(fnameStr);
|
||||
if(idxTfd < 0) {
|
||||
return -1;
|
||||
}
|
||||
walBuildLogName(pWal, fileFirstVer, fnameStr);
|
||||
logTfd = tfOpenReadWrite(fnameStr);
|
||||
if(logTfd < 0) {
|
||||
return -1;
|
||||
}
|
||||
//switch file
|
||||
pWal->writeIdxTfd = idxTfd;
|
||||
pWal->writeLogTfd = logTfd;
|
||||
//change status
|
||||
pWal->curStatus = WAL_CUR_FILE_WRITABLE;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int walChangeFile(SWal *pWal, int64_t ver) {
|
||||
int code = 0;
|
||||
int64_t idxTfd, logTfd;
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
|
@ -86,21 +109,21 @@ static int walChangeFile(SWal *pWal, int64_t ver) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int walGetVerOffset(SWal* pWal, int64_t ver) {
|
||||
int code;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int walSeekVer(SWal *pWal, int64_t ver) {
|
||||
int code;
|
||||
if((!(pWal->curStatus & WAL_CUR_FAILED)) && ver == pWal->curVersion) {
|
||||
if(ver == pWal->lastVersion) {
|
||||
return 0;
|
||||
}
|
||||
if(ver > pWal->lastVersion) {
|
||||
//TODO: some records are skipped
|
||||
return -1;
|
||||
}
|
||||
if(ver < pWal->firstVersion) {
|
||||
//TODO: try to seek pruned log
|
||||
if(ver > pWal->lastVersion || ver < pWal->firstVersion) {
|
||||
return -1;
|
||||
}
|
||||
if(ver < pWal->snapshotVersion) {
|
||||
//TODO: seek snapshotted log, invalid in some cases
|
||||
//TODO: set flag to prevent roll back
|
||||
}
|
||||
if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
|
||||
code = walChangeFile(pWal, ver);
|
|
@ -21,65 +21,6 @@
|
|||
#include "tfile.h"
|
||||
#include "walInt.h"
|
||||
|
||||
static void walFtruncate(SWal *pWal, int64_t ver);
|
||||
|
||||
int32_t walCommit(SWal *pWal, int64_t ver) {
|
||||
ASSERT(pWal->snapshotVersion <= pWal->commitVersion);
|
||||
ASSERT(pWal->commitVersion <= pWal->lastVersion);
|
||||
ASSERT(ver >= pWal->commitVersion);
|
||||
ASSERT(ver <= pWal->lastVersion);
|
||||
pWal->commitVersion = ver;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||
//TODO: ftruncate
|
||||
ASSERT(ver > pWal->commitVersion);
|
||||
ASSERT(ver <= pWal->lastVersion);
|
||||
//seek position
|
||||
walSeekVer(pWal, ver);
|
||||
walFtruncate(pWal, ver);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t walTakeSnapshot(SWal *pWal, int64_t ver) {
|
||||
pWal->snapshotVersion = ver;
|
||||
int ts = taosGetTimestampSec();
|
||||
|
||||
int deleteCnt = 0;
|
||||
int64_t newTotSize = pWal->totSize;
|
||||
WalFileInfo tmp;
|
||||
tmp.firstVer = ver;
|
||||
//mark files safe to delete
|
||||
WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
|
||||
//iterate files, until the searched result
|
||||
for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
|
||||
if(pWal->totSize > pWal->retentionSize ||
|
||||
iter->closeTs + pWal->retentionPeriod > ts) {
|
||||
//delete according to file size or close time
|
||||
deleteCnt++;
|
||||
newTotSize -= iter->fileSize;
|
||||
}
|
||||
}
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
//remove file
|
||||
for(int i = 0; i < deleteCnt; i++) {
|
||||
WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, i);
|
||||
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
|
||||
remove(fnameStr);
|
||||
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
|
||||
remove(fnameStr);
|
||||
}
|
||||
|
||||
//save snapshot ver, commit ver
|
||||
|
||||
|
||||
//make new array, remove files
|
||||
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
|
||||
pWal->totSize = newTotSize;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId);
|
||||
|
@ -172,6 +113,185 @@ void walRemoveAllOldFiles(void *handle) {
|
|||
}
|
||||
#endif
|
||||
|
||||
int32_t walCommit(SWal *pWal, int64_t ver) {
|
||||
ASSERT(pWal->commitVersion >= pWal->snapshotVersion);
|
||||
ASSERT(pWal->commitVersion <= pWal->lastVersion);
|
||||
if(ver < pWal->commitVersion || ver > pWal->lastVersion) {
|
||||
return -1;
|
||||
}
|
||||
pWal->commitVersion = ver;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||
int code;
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
if(ver == pWal->lastVersion) {
|
||||
return 0;
|
||||
}
|
||||
if(ver > pWal->lastVersion || ver < pWal->commitVersion) {
|
||||
return -1;
|
||||
}
|
||||
pthread_mutex_lock(&pWal->mutex);
|
||||
|
||||
//find correct file
|
||||
if(ver < walGetLastFileFirstVer(pWal)) {
|
||||
//close current files
|
||||
tfClose(pWal->writeIdxTfd);
|
||||
tfClose(pWal->writeLogTfd);
|
||||
//open old files
|
||||
code = walChangeFile(pWal, ver);
|
||||
if(code != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
//delete files
|
||||
int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
|
||||
for(int i = pWal->writeCur; i < fileSetSize; i++) {
|
||||
walBuildLogName(pWal, ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
|
||||
remove(fnameStr);
|
||||
walBuildIdxName(pWal, ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
|
||||
remove(fnameStr);
|
||||
}
|
||||
//pop from fileInfoSet
|
||||
taosArraySetSize(pWal->fileInfoSet, pWal->writeCur + 1);
|
||||
}
|
||||
|
||||
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
|
||||
int64_t idxTfd = tfOpenReadWrite(fnameStr);
|
||||
|
||||
//change to deserialize function
|
||||
|
||||
if(idxTfd < 0) {
|
||||
pthread_mutex_unlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
int idxOff = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE;
|
||||
code = tfLseek(idxTfd, idxOff, SEEK_SET);
|
||||
if(code < 0) {
|
||||
pthread_mutex_unlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
//read idx file and get log file pos
|
||||
//TODO:change to deserialize function
|
||||
WalIdxEntry entry;
|
||||
if(tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) {
|
||||
pthread_mutex_unlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
ASSERT(entry.ver == ver);
|
||||
|
||||
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
|
||||
int64_t logTfd = tfOpenReadWrite(fnameStr);
|
||||
if(logTfd < 0) {
|
||||
//TODO
|
||||
pthread_mutex_unlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
code = tfLseek(logTfd, entry.offset, SEEK_SET);
|
||||
if(code < 0) {
|
||||
//TODO
|
||||
pthread_mutex_unlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
//validate offset
|
||||
SWalHead head;
|
||||
ASSERT(tfValid(logTfd));
|
||||
int size = tfRead(logTfd, &head, sizeof(SWalHead));
|
||||
if(size != sizeof(SWalHead)) {
|
||||
return -1;
|
||||
}
|
||||
code = walValidHeadCksum(&head);
|
||||
|
||||
ASSERT(code == 0);
|
||||
if(code != 0) {
|
||||
return -1;
|
||||
}
|
||||
if(head.head.version != ver) {
|
||||
//TODO
|
||||
return -1;
|
||||
}
|
||||
//truncate old files
|
||||
code = tfFtruncate(logTfd, entry.offset);
|
||||
if(code < 0) {
|
||||
return -1;
|
||||
}
|
||||
code = tfFtruncate(idxTfd, idxOff);
|
||||
if(code < 0) {
|
||||
return -1;
|
||||
}
|
||||
pWal->lastVersion = ver - 1;
|
||||
((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
|
||||
((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
|
||||
|
||||
//unlock
|
||||
pthread_mutex_unlock(&pWal->mutex);
|
||||
return 0;
|
||||
}
|
||||
int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) {
|
||||
pWal->snapshottingVer = ver;
|
||||
//check file rolling
|
||||
if(pWal->retentionPeriod == 0) {
|
||||
walRoll(pWal);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t walEndTakeSnapshot(SWal *pWal) {
|
||||
int64_t ver = pWal->snapshottingVer;
|
||||
if(ver == -1) return -1;
|
||||
|
||||
pWal->snapshotVersion = ver;
|
||||
int ts = taosGetTimestampSec();
|
||||
|
||||
int deleteCnt = 0;
|
||||
int64_t newTotSize = pWal->totSize;
|
||||
WalFileInfo tmp;
|
||||
tmp.firstVer = ver;
|
||||
//find files safe to delete
|
||||
WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
|
||||
if(ver >= pInfo->lastVer) {
|
||||
pInfo++;
|
||||
}
|
||||
//iterate files, until the searched result
|
||||
for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
|
||||
if(pWal->totSize > pWal->retentionSize ||
|
||||
iter->closeTs + pWal->retentionPeriod > ts) {
|
||||
//delete according to file size or close time
|
||||
deleteCnt++;
|
||||
newTotSize -= iter->fileSize;
|
||||
}
|
||||
}
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
//remove file
|
||||
for(int i = 0; i < deleteCnt; i++) {
|
||||
WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, i);
|
||||
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
|
||||
remove(fnameStr);
|
||||
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
|
||||
remove(fnameStr);
|
||||
}
|
||||
|
||||
//make new array, remove files
|
||||
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
|
||||
if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
|
||||
pWal->firstVersion = -1;
|
||||
} else {
|
||||
pWal->firstVersion = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
|
||||
}
|
||||
pWal->totSize = newTotSize;
|
||||
pWal->snapshottingVer = -1;
|
||||
|
||||
//save snapshot ver, commit ver
|
||||
int code = walWriteMeta(pWal);
|
||||
if(code != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int walRoll(SWal *pWal) {
|
||||
int code = 0;
|
||||
if(pWal->writeIdxTfd != -1) {
|
||||
|
@ -211,6 +331,7 @@ int walRoll(SWal *pWal) {
|
|||
//switch file
|
||||
pWal->writeIdxTfd = idxTfd;
|
||||
pWal->writeLogTfd = logTfd;
|
||||
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
||||
//change status
|
||||
pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE;
|
||||
|
||||
|
@ -218,32 +339,6 @@ int walRoll(SWal *pWal) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int walChangeFileToLast(SWal *pWal) {
|
||||
int64_t idxTfd, logTfd;
|
||||
WalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet);
|
||||
ASSERT(pRet != NULL);
|
||||
int64_t fileFirstVer = pRet->firstVer;
|
||||
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
walBuildIdxName(pWal, fileFirstVer, fnameStr);
|
||||
idxTfd = tfOpenReadWrite(fnameStr);
|
||||
if(idxTfd < 0) {
|
||||
return -1;
|
||||
}
|
||||
walBuildLogName(pWal, fileFirstVer, fnameStr);
|
||||
logTfd = tfOpenReadWrite(fnameStr);
|
||||
if(logTfd < 0) {
|
||||
return -1;
|
||||
}
|
||||
//switch file
|
||||
pWal->writeIdxTfd = idxTfd;
|
||||
pWal->writeLogTfd = logTfd;
|
||||
//change status
|
||||
pWal->curVersion = fileFirstVer;
|
||||
pWal->curStatus = WAL_CUR_FILE_WRITABLE;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
||||
int code = 0;
|
||||
//get index file
|
||||
|
@ -253,9 +348,11 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
|||
wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno));
|
||||
return code;
|
||||
}
|
||||
int64_t writeBuf[2] = { ver, offset };
|
||||
int size = tfWrite(pWal->writeIdxTfd, writeBuf, sizeof(writeBuf));
|
||||
if(size != sizeof(writeBuf)) {
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
|
||||
WalIdxEntry entry = { .ver = ver, .offset = offset };
|
||||
int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(WalIdxEntry));
|
||||
if(size != sizeof(WalIdxEntry)) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
|
@ -270,13 +367,14 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
|
|||
|
||||
if (index == pWal->lastVersion + 1) {
|
||||
if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
|
||||
pWal->firstVersion = index;
|
||||
code = walRoll(pWal);
|
||||
ASSERT(code == 0);
|
||||
} else {
|
||||
int64_t passed = walGetSeq() - pWal->lastRollSeq;
|
||||
if(pWal->rollPeriod != -1 && passed > pWal->rollPeriod) {
|
||||
if(pWal->rollPeriod != -1 && pWal->rollPeriod != 0 && passed > pWal->rollPeriod) {
|
||||
walRoll(pWal);
|
||||
} else if(pWal->segSize != -1 && walGetLastFileSize(pWal) > pWal->segSize) {
|
||||
} else if(pWal->segSize != -1 && pWal->segSize != 0 && walGetLastFileSize(pWal) > pWal->segSize) {
|
||||
walRoll(pWal);
|
||||
}
|
||||
}
|
||||
|
@ -287,16 +385,13 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
|
|||
}
|
||||
/*if (!tfValid(pWal->curLogTfd)) return 0;*/
|
||||
|
||||
pWal->head.version = index;
|
||||
|
||||
pWal->head.signature = WAL_SIGNATURE;
|
||||
pWal->head.len = bodyLen;
|
||||
pWal->head.msgType = msgType;
|
||||
|
||||
pWal->head.cksumHead = taosCalcChecksum(0, (const uint8_t*)&pWal->head, sizeof(SWalHead)- sizeof(uint32_t)*2);
|
||||
pWal->head.cksumBody = taosCalcChecksum(0, (const uint8_t*)&body, bodyLen);
|
||||
|
||||
pthread_mutex_lock(&pWal->mutex);
|
||||
pWal->head.head.version = index;
|
||||
|
||||
pWal->head.head.len = bodyLen;
|
||||
pWal->head.head.msgType = msgType;
|
||||
pWal->head.cksumHead = walCalcHeadCksum(&pWal->head);
|
||||
pWal->head.cksumBody = walCalcBodyCksum(body, bodyLen);
|
||||
|
||||
if (tfWrite(pWal->writeLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) {
|
||||
//ftruncate
|
||||
|
@ -312,6 +407,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
|
|||
code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal));
|
||||
if(code != 0) {
|
||||
//TODO
|
||||
return -1;
|
||||
}
|
||||
|
||||
//set status
|
||||
|
@ -326,8 +422,6 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
|
|||
}
|
||||
|
||||
void walFsync(SWal *pWal, bool forceFsync) {
|
||||
if (pWal == NULL || !tfValid(pWal->writeLogTfd)) return;
|
||||
|
||||
if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
|
||||
wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal));
|
||||
if (tfFsync(pWal->writeLogTfd) < 0) {
|
||||
|
@ -408,7 +502,7 @@ static int walValidateOffset(SWal* pWal, int64_t ver) {
|
|||
int code = 0;
|
||||
SWalHead *pHead = NULL;
|
||||
code = (int)walRead(pWal, &pHead, ver);
|
||||
if(pHead->version != ver) {
|
||||
if(pHead->head.version != ver) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
|
@ -428,15 +522,6 @@ static int64_t walGetOffset(SWal* pWal, int64_t ver) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void walFtruncate(SWal *pWal, int64_t ver) {
|
||||
int64_t tfd = pWal->writeLogTfd;
|
||||
tfFtruncate(tfd, ver);
|
||||
tfFsync(tfd);
|
||||
tfd = pWal->writeIdxTfd;
|
||||
tfFtruncate(tfd, ver * WAL_IDX_ENTRY_SIZE);
|
||||
tfFsync(tfd);
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) {
|
||||
int64_t pos = *offset;
|
||||
|
|
|
@ -36,6 +36,36 @@ class WalCleanEnv : public ::testing::Test {
|
|||
const char* pathName = "/tmp/wal_test";
|
||||
};
|
||||
|
||||
class WalCleanDeleteEnv : public ::testing::Test {
|
||||
protected:
|
||||
static void SetUpTestCase() {
|
||||
int code = walInit();
|
||||
ASSERT(code == 0);
|
||||
}
|
||||
|
||||
static void TearDownTestCase() {
|
||||
walCleanUp();
|
||||
}
|
||||
|
||||
void SetUp() override {
|
||||
taosRemoveDir(pathName);
|
||||
SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal));
|
||||
memset(pCfg, 0, sizeof(SWalCfg));
|
||||
pCfg->retentionPeriod = 0;
|
||||
pCfg->walLevel = TAOS_WAL_FSYNC;
|
||||
pWal = walOpen(pathName, pCfg);
|
||||
ASSERT(pWal != NULL);
|
||||
}
|
||||
|
||||
void TearDown() override {
|
||||
walClose(pWal);
|
||||
pWal = NULL;
|
||||
}
|
||||
|
||||
SWal* pWal = NULL;
|
||||
const char* pathName = "/tmp/wal_test";
|
||||
};
|
||||
|
||||
class WalKeepEnv : public ::testing::Test {
|
||||
protected:
|
||||
static void SetUpTestCase() {
|
||||
|
@ -110,40 +140,94 @@ TEST_F(WalCleanEnv, removeOldMeta) {
|
|||
ASSERT(code == 0);
|
||||
}
|
||||
|
||||
TEST_F(WalKeepEnv, readOldMeta) {
|
||||
int code = walRollFileInfo(pWal);
|
||||
ASSERT(code == 0);
|
||||
code = walWriteMeta(pWal);
|
||||
ASSERT(code == 0);
|
||||
code = walRollFileInfo(pWal);
|
||||
ASSERT(code == 0);
|
||||
code = walWriteMeta(pWal);
|
||||
ASSERT(code == 0);
|
||||
char*oldss = walMetaSerialize(pWal);
|
||||
//TEST_F(WalKeepEnv, readOldMeta) {
|
||||
//int code = walRollFileInfo(pWal);
|
||||
//ASSERT(code == 0);
|
||||
//code = walWriteMeta(pWal);
|
||||
//ASSERT(code == 0);
|
||||
//code = walRollFileInfo(pWal);
|
||||
//ASSERT(code == 0);
|
||||
//code = walWriteMeta(pWal);
|
||||
//ASSERT(code == 0);
|
||||
//char*oldss = walMetaSerialize(pWal);
|
||||
|
||||
TearDown();
|
||||
SetUp();
|
||||
code = walReadMeta(pWal);
|
||||
ASSERT(code == 0);
|
||||
char* newss = walMetaSerialize(pWal);
|
||||
//TearDown();
|
||||
//SetUp();
|
||||
//code = walReadMeta(pWal);
|
||||
//ASSERT(code == 0);
|
||||
//char* newss = walMetaSerialize(pWal);
|
||||
|
||||
int len = strlen(oldss);
|
||||
ASSERT_EQ(len, strlen(newss));
|
||||
for(int i = 0; i < len; i++) {
|
||||
EXPECT_EQ(oldss[i], newss[i]);
|
||||
}
|
||||
}
|
||||
//int len = strlen(oldss);
|
||||
//ASSERT_EQ(len, strlen(newss));
|
||||
//for(int i = 0; i < len; i++) {
|
||||
//EXPECT_EQ(oldss[i], newss[i]);
|
||||
//}
|
||||
//}
|
||||
|
||||
TEST_F(WalKeepEnv, write) {
|
||||
TEST_F(WalCleanEnv, write) {
|
||||
const char* ranStr = "tvapq02tcp";
|
||||
const int len = strlen(ranStr);
|
||||
int code;
|
||||
for(int i = 0; i < 10; i++) {
|
||||
code = walWrite(pWal, i, i+1, (void*)ranStr, len);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(pWal->lastVersion, i);
|
||||
code = walWrite(pWal, i+2, i, (void*)ranStr, len);
|
||||
ASSERT_EQ(code, -1);
|
||||
ASSERT_EQ(pWal->lastVersion, i);
|
||||
}
|
||||
code = walWriteMeta(pWal);
|
||||
ASSERT_EQ(code, 0);
|
||||
}
|
||||
|
||||
TEST_F(WalCleanEnv, rollback) {
|
||||
const char* ranStr = "tvapq02tcp";
|
||||
const int len = strlen(ranStr);
|
||||
int code;
|
||||
for(int i = 0; i < 10; i++) {
|
||||
code = walWrite(pWal, i, i+1, (void*)ranStr, len);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(pWal->lastVersion, i);
|
||||
}
|
||||
code = walRollback(pWal, 5);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(pWal->lastVersion, 4);
|
||||
code = walRollback(pWal, 3);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(pWal->lastVersion, 2);
|
||||
code = walWriteMeta(pWal);
|
||||
ASSERT_EQ(code, 0);
|
||||
}
|
||||
|
||||
TEST_F(WalCleanDeleteEnv, roll) {
|
||||
const char* ranStr = "tvapq02tcp";
|
||||
const int len = strlen(ranStr);
|
||||
int code;
|
||||
int i;
|
||||
for(i = 0; i < 100; i++) {
|
||||
code = walWrite(pWal, i, 0, (void*)ranStr, len);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(pWal->lastVersion, i);
|
||||
code = walCommit(pWal, i);
|
||||
ASSERT_EQ(pWal->commitVersion, i);
|
||||
}
|
||||
|
||||
walBeginTakeSnapshot(pWal, i-1);
|
||||
ASSERT_EQ(pWal->snapshottingVer, i-1);
|
||||
walEndTakeSnapshot(pWal);
|
||||
ASSERT_EQ(pWal->snapshotVersion, i-1);
|
||||
ASSERT_EQ(pWal->snapshottingVer, -1);
|
||||
|
||||
code = walWrite(pWal, 5, 0, (void*)ranStr, len);
|
||||
ASSERT_NE(code, 0);
|
||||
|
||||
for(; i < 200; i++) {
|
||||
code = walWrite(pWal, i, 0, (void*)ranStr, len);
|
||||
ASSERT_EQ(code, 0);
|
||||
code = walCommit(pWal, i);
|
||||
ASSERT_EQ(pWal->commitVersion, i);
|
||||
}
|
||||
|
||||
code = walWriteMeta(pWal);
|
||||
ASSERT_EQ(code, 0);
|
||||
}
|
||||
|
|
|
@ -241,12 +241,16 @@ void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) {
|
|||
assert(cnt <= pArray->size);
|
||||
pArray->size = pArray->size - cnt;
|
||||
if(pArray->size == 0) {
|
||||
pArray->size = 0;
|
||||
return;
|
||||
}
|
||||
memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size);
|
||||
}
|
||||
|
||||
void taosArrayPopTailBatch(SArray* pArray, size_t cnt) {
|
||||
assert(cnt <= pArray->size);
|
||||
pArray->size = pArray->size - cnt;
|
||||
}
|
||||
|
||||
void taosArrayRemove(SArray* pArray, size_t index) {
|
||||
assert(index < pArray->size);
|
||||
|
||||
|
@ -329,6 +333,11 @@ void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t compa
|
|||
return taosbsearch(key, pArray->pData, pArray->size, pArray->elemSize, comparFn, flags);
|
||||
}
|
||||
|
||||
int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t comparFn, int flags) {
|
||||
void* item = taosArraySearch(pArray, key, comparFn, flags);
|
||||
return (int32_t)((char*)item - (char*)pArray->pData) / pArray->elemSize;
|
||||
}
|
||||
|
||||
void taosArraySortString(SArray* pArray, __compar_fn_t comparFn) {
|
||||
assert(pArray != NULL);
|
||||
qsort(pArray->pData, pArray->size, pArray->elemSize, comparFn);
|
||||
|
|
Loading…
Reference in New Issue