refactor wal
This commit is contained in:
parent
7340781afd
commit
c048dc5b60
|
@ -32,23 +32,19 @@ extern int32_t wDebugFlag;
|
||||||
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
|
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
|
||||||
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
|
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
|
||||||
|
|
||||||
#define WAL_PREFIX "wal"
|
#define WAL_HEAD_VER 0
|
||||||
#define WAL_PREFIX_LEN 3
|
|
||||||
#define WAL_NOSUFFIX_LEN 20
|
#define WAL_NOSUFFIX_LEN 20
|
||||||
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1)
|
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1)
|
||||||
#define WAL_LOG_SUFFIX "log"
|
#define WAL_LOG_SUFFIX "log"
|
||||||
#define WAL_INDEX_SUFFIX "idx"
|
#define WAL_INDEX_SUFFIX "idx"
|
||||||
#define WAL_REFRESH_MS 1000
|
#define WAL_REFRESH_MS 1000
|
||||||
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
|
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead))
|
||||||
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
||||||
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
||||||
|
|
||||||
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
|
#define WAL_CUR_FAILED 1
|
||||||
#define WAL_CUR_POS_WRITABLE 1
|
|
||||||
#define WAL_CUR_FILE_WRITABLE 2
|
|
||||||
#define WAL_CUR_FAILED 4
|
|
||||||
|
|
||||||
#pragma pack(push,1)
|
#pragma pack(push, 1)
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TAOS_WAL_NOLOG = 0,
|
TAOS_WAL_NOLOG = 0,
|
||||||
TAOS_WAL_WRITE = 1,
|
TAOS_WAL_WRITE = 1,
|
||||||
|
@ -56,11 +52,11 @@ typedef enum {
|
||||||
} EWalType;
|
} EWalType;
|
||||||
|
|
||||||
typedef struct SWalReadHead {
|
typedef struct SWalReadHead {
|
||||||
int8_t sver;
|
int8_t headVer;
|
||||||
uint8_t msgType;
|
uint8_t msgType;
|
||||||
int8_t reserved[2];
|
int8_t reserved[2];
|
||||||
int32_t len;
|
int32_t len;
|
||||||
//int64_t ingestTs; //not implemented
|
int64_t ingestTs; //not implemented
|
||||||
int64_t version;
|
int64_t version;
|
||||||
char body[];
|
char body[];
|
||||||
} SWalReadHead;
|
} SWalReadHead;
|
||||||
|
@ -76,14 +72,6 @@ typedef struct {
|
||||||
} SWalCfg;
|
} SWalCfg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
//union {
|
|
||||||
//uint32_t info;
|
|
||||||
//struct {
|
|
||||||
//uint32_t sver:3;
|
|
||||||
//uint32_t msgtype: 5;
|
|
||||||
//uint32_t reserved : 24;
|
|
||||||
//};
|
|
||||||
//};
|
|
||||||
uint32_t cksumHead;
|
uint32_t cksumHead;
|
||||||
uint32_t cksumBody;
|
uint32_t cksumBody;
|
||||||
SWalReadHead head;
|
SWalReadHead head;
|
||||||
|
@ -102,16 +90,16 @@ typedef struct SWal {
|
||||||
SWalCfg cfg;
|
SWalCfg cfg;
|
||||||
SWalVer vers;
|
SWalVer vers;
|
||||||
//file set
|
//file set
|
||||||
int32_t writeCur;
|
|
||||||
int64_t writeLogTfd;
|
int64_t writeLogTfd;
|
||||||
int64_t writeIdxTfd;
|
int64_t writeIdxTfd;
|
||||||
|
int32_t writeCur;
|
||||||
SArray* fileInfoSet;
|
SArray* fileInfoSet;
|
||||||
//ctl
|
//statistics
|
||||||
int32_t curStatus;
|
|
||||||
int32_t fsyncSeq;
|
|
||||||
int64_t totSize;
|
int64_t totSize;
|
||||||
int64_t refId;
|
|
||||||
int64_t lastRollSeq;
|
int64_t lastRollSeq;
|
||||||
|
//ctl
|
||||||
|
int32_t fsyncSeq;
|
||||||
|
int64_t refId;
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
//path
|
//path
|
||||||
char path[WAL_PATH_LEN];
|
char path[WAL_PATH_LEN];
|
||||||
|
@ -131,7 +119,7 @@ typedef struct SWalReadHandle {
|
||||||
} SWalReadHandle;
|
} SWalReadHandle;
|
||||||
#pragma pack(pop)
|
#pragma pack(pop)
|
||||||
|
|
||||||
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);
|
//typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);
|
||||||
|
|
||||||
// module initialization
|
// module initialization
|
||||||
int32_t walInit();
|
int32_t walInit();
|
||||||
|
@ -151,8 +139,8 @@ int32_t walCommit(SWal *, int64_t ver);
|
||||||
// truncate after
|
// truncate after
|
||||||
int32_t walRollback(SWal *, int64_t ver);
|
int32_t walRollback(SWal *, int64_t ver);
|
||||||
// notify that previous logs can be pruned safely
|
// notify that previous logs can be pruned safely
|
||||||
int32_t walBeginTakeSnapshot(SWal *, int64_t ver);
|
int32_t walBeginSnapshot(SWal *, int64_t ver);
|
||||||
int32_t walEndTakeSnapshot(SWal *);
|
int32_t walEndSnapshot(SWal *);
|
||||||
//int32_t walDataCorrupted(SWal*);
|
//int32_t walDataCorrupted(SWal*);
|
||||||
|
|
||||||
// read
|
// read
|
||||||
|
@ -161,7 +149,7 @@ void walCloseReadHandle(SWalReadHandle *);
|
||||||
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
|
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
|
||||||
|
|
||||||
int32_t walRead(SWal *, SWalHead **, int64_t ver);
|
int32_t walRead(SWal *, SWalHead **, int64_t ver);
|
||||||
int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
|
//int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
|
||||||
|
|
||||||
// lifecycle check
|
// lifecycle check
|
||||||
int64_t walGetFirstVer(SWal *);
|
int64_t walGetFirstVer(SWal *);
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
#ifndef _TD_UTIL_REF_H
|
#ifndef _TD_UTIL_REF_H
|
||||||
#define _TD_UTIL_REF_H
|
#define _TD_UTIL_REF_H
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -33,12 +33,10 @@ typedef struct WalFileInfo {
|
||||||
int64_t fileSize;
|
int64_t fileSize;
|
||||||
} WalFileInfo;
|
} WalFileInfo;
|
||||||
|
|
||||||
#pragma pack(push,1)
|
|
||||||
typedef struct WalIdxEntry {
|
typedef struct WalIdxEntry {
|
||||||
int64_t ver;
|
int64_t ver;
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
} WalIdxEntry;
|
} WalIdxEntry;
|
||||||
#pragma pack(pop)
|
|
||||||
|
|
||||||
static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) {
|
static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) {
|
||||||
WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft;
|
WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft;
|
||||||
|
@ -107,8 +105,16 @@ static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) {
|
||||||
return taosCalcChecksum(0, (uint8_t*)body, len);
|
return taosCalcChecksum(0, (uint8_t*)body, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
int walReadMeta(SWal* pWal);
|
static inline void walResetVer(SWalVer* pVer) {
|
||||||
int walWriteMeta(SWal* pWal);
|
pVer->firstVer = -1;
|
||||||
|
pVer->verInSnapshotting = -1;
|
||||||
|
pVer->snapshotVer = -1;
|
||||||
|
pVer->commitVer = -1;
|
||||||
|
pVer->lastVer = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int walLoadMeta(SWal* pWal);
|
||||||
|
int walSaveMeta(SWal* pWal);
|
||||||
int walRollFileInfo(SWal* pWal);
|
int walRollFileInfo(SWal* pWal);
|
||||||
|
|
||||||
char* walMetaSerialize(SWal* pWal);
|
char* walMetaSerialize(SWal* pWal);
|
||||||
|
|
|
@ -24,18 +24,22 @@
|
||||||
#include <libgen.h>
|
#include <libgen.h>
|
||||||
#include <regex.h>
|
#include <regex.h>
|
||||||
|
|
||||||
int64_t walGetFirstVer(SWal *pWal) {
|
int64_t inline walGetFirstVer(SWal *pWal) {
|
||||||
return pWal->vers.firstVer;
|
return pWal->vers.firstVer;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t walGetSnaphostVer(SWal *pWal) {
|
int64_t inline walGetSnaphostVer(SWal *pWal) {
|
||||||
return pWal->vers.snapshotVer;
|
return pWal->vers.snapshotVer;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t walGetLastVer(SWal *pWal) {
|
int64_t inline walGetLastVer(SWal *pWal) {
|
||||||
return pWal->vers.lastVer;
|
return pWal->vers.lastVer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
|
||||||
|
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
|
||||||
|
}
|
||||||
|
|
||||||
int walRollFileInfo(SWal* pWal) {
|
int walRollFileInfo(SWal* pWal) {
|
||||||
int64_t ts = taosGetTimestampSec();
|
int64_t ts = taosGetTimestampSec();
|
||||||
|
|
||||||
|
@ -150,10 +154,6 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
|
|
||||||
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int walFindCurMetaVer(SWal* pWal) {
|
static int walFindCurMetaVer(SWal* pWal) {
|
||||||
const char * pattern = "^meta-ver[0-9]+$";
|
const char * pattern = "^meta-ver[0-9]+$";
|
||||||
regex_t walMetaRegexPattern;
|
regex_t walMetaRegexPattern;
|
||||||
|
@ -182,7 +182,7 @@ static int walFindCurMetaVer(SWal* pWal) {
|
||||||
return metaVer;
|
return metaVer;
|
||||||
}
|
}
|
||||||
|
|
||||||
int walWriteMeta(SWal* pWal) {
|
int walSaveMeta(SWal* pWal) {
|
||||||
int metaVer = walFindCurMetaVer(pWal);
|
int metaVer = walFindCurMetaVer(pWal);
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN];
|
||||||
walBuildMetaName(pWal, metaVer+1, fnameStr);
|
walBuildMetaName(pWal, metaVer+1, fnameStr);
|
||||||
|
@ -207,7 +207,7 @@ int walWriteMeta(SWal* pWal) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int walReadMeta(SWal* pWal) {
|
int walLoadMeta(SWal* pWal) {
|
||||||
ASSERT(pWal->fileInfoSet->size == 0);
|
ASSERT(pWal->fileInfoSet->size == 0);
|
||||||
//find existing meta file
|
//find existing meta file
|
||||||
int metaVer = walFindCurMetaVer(pWal);
|
int metaVer = walFindCurMetaVer(pWal);
|
||||||
|
|
|
@ -21,23 +21,17 @@
|
||||||
#include "compare.h"
|
#include "compare.h"
|
||||||
#include "walInt.h"
|
#include "walInt.h"
|
||||||
|
|
||||||
//internal
|
|
||||||
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
|
|
||||||
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
|
|
||||||
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId);
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t refSetId;
|
|
||||||
uint32_t seq;
|
|
||||||
int8_t stop;
|
int8_t stop;
|
||||||
int8_t inited;
|
int8_t inited;
|
||||||
|
uint32_t seq;
|
||||||
|
int32_t refSetId;
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
} SWalMgmt;
|
} SWalMgmt;
|
||||||
|
|
||||||
static SWalMgmt tsWal = {0, .seq = 1};
|
static SWalMgmt tsWal = {0, .seq = 1};
|
||||||
static int32_t walCreateThread();
|
static int32_t walCreateThread();
|
||||||
static void walStopThread();
|
static void walStopThread();
|
||||||
static int32_t walInitObj(SWal *pWal);
|
|
||||||
static void walFreeObj(void *pWal);
|
static void walFreeObj(void *pWal);
|
||||||
|
|
||||||
int64_t walGetSeq() {
|
int64_t walGetSeq() {
|
||||||
|
@ -68,7 +62,7 @@ int32_t walInit() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void walCleanUp() {
|
void walCleanUp() {
|
||||||
int old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0);
|
int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0);
|
||||||
if(old == 0) {
|
if(old == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -83,48 +77,59 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
memset(pWal, 0, sizeof(SWal));
|
|
||||||
pWal->writeLogTfd = -1;
|
|
||||||
pWal->writeIdxTfd = -1;
|
|
||||||
pWal->writeCur = -1;
|
|
||||||
|
|
||||||
//set config
|
//set config
|
||||||
memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg));
|
memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg));
|
||||||
|
pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
|
||||||
|
if(pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
|
||||||
|
|
||||||
//init version info
|
tstrncpy(pWal->path, path, sizeof(pWal->path));
|
||||||
pWal->vers.firstVer = -1;
|
if(taosMkDir(pWal->path) != 0) {
|
||||||
pWal->vers.commitVer = -1;
|
wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
|
||||||
pWal->vers.snapshotVer = -1;
|
return NULL;
|
||||||
pWal->vers.lastVer = -1;
|
}
|
||||||
|
|
||||||
pWal->vers.verInSnapshotting = -1;
|
//open meta
|
||||||
|
pWal->writeLogTfd = -1;
|
||||||
pWal->totSize = 0;
|
pWal->writeIdxTfd = -1;
|
||||||
|
pWal->writeCur = -1;
|
||||||
|
pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo));
|
||||||
|
if(pWal->fileInfoSet == NULL) {
|
||||||
|
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno));
|
||||||
|
free(pWal);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
//init status
|
//init status
|
||||||
|
walResetVer(&pWal->vers);
|
||||||
|
pWal->totSize = 0;
|
||||||
pWal->lastRollSeq = -1;
|
pWal->lastRollSeq = -1;
|
||||||
|
|
||||||
//init write buffer
|
//init write buffer
|
||||||
memset(&pWal->writeHead, 0, sizeof(SWalHead));
|
memset(&pWal->writeHead, 0, sizeof(SWalHead));
|
||||||
pWal->writeHead.head.sver = 0;
|
pWal->writeHead.head.headVer = WAL_HEAD_VER;
|
||||||
|
|
||||||
tstrncpy(pWal->path, path, sizeof(pWal->path));
|
if(pthread_mutex_init(&pWal->mutex, NULL) < 0) {
|
||||||
pthread_mutex_init(&pWal->mutex, NULL);
|
taosArrayDestroy(pWal->fileInfoSet);
|
||||||
|
free(pWal);
|
||||||
pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
|
|
||||||
if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
|
|
||||||
|
|
||||||
if (walInitObj(pWal) != 0) {
|
|
||||||
walFreeObj(pWal);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pWal->refId = taosAddRef(tsWal.refSetId, pWal);
|
pWal->refId = taosAddRef(tsWal.refSetId, pWal);
|
||||||
if (pWal->refId < 0) {
|
if(pWal->refId < 0) {
|
||||||
walFreeObj(pWal);
|
pthread_mutex_destroy(&pWal->mutex);
|
||||||
|
taosArrayDestroy(pWal->fileInfoSet);
|
||||||
|
free(pWal);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(walLoadMeta(pWal) < 0) {
|
||||||
|
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
||||||
|
pthread_mutex_destroy(&pWal->mutex);
|
||||||
|
taosArrayDestroy(pWal->fileInfoSet);
|
||||||
|
free(pWal);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
walReadMeta(pWal);
|
|
||||||
|
|
||||||
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod);
|
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod);
|
||||||
|
|
||||||
|
@ -152,43 +157,23 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void walClose(SWal *pWal) {
|
void walClose(SWal *pWal) {
|
||||||
if (pWal == NULL) return;
|
|
||||||
|
|
||||||
pthread_mutex_lock(&pWal->mutex);
|
pthread_mutex_lock(&pWal->mutex);
|
||||||
tfClose(pWal->writeLogTfd);
|
tfClose(pWal->writeLogTfd);
|
||||||
pWal->writeLogTfd = -1;
|
pWal->writeLogTfd = -1;
|
||||||
tfClose(pWal->writeIdxTfd);
|
tfClose(pWal->writeIdxTfd);
|
||||||
pWal->writeIdxTfd = -1;
|
pWal->writeIdxTfd = -1;
|
||||||
walWriteMeta(pWal);
|
walSaveMeta(pWal);
|
||||||
taosArrayDestroy(pWal->fileInfoSet);
|
taosArrayDestroy(pWal->fileInfoSet);
|
||||||
pWal->fileInfoSet = NULL;
|
pWal->fileInfoSet = NULL;
|
||||||
pthread_mutex_unlock(&pWal->mutex);
|
pthread_mutex_unlock(&pWal->mutex);
|
||||||
|
|
||||||
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t walInitObj(SWal *pWal) {
|
|
||||||
if (taosMkDir(pWal->path) != 0) {
|
|
||||||
wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
|
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
|
||||||
}
|
|
||||||
pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo));
|
|
||||||
if(pWal->fileInfoSet == NULL) {
|
|
||||||
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno));
|
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
|
||||||
}
|
|
||||||
|
|
||||||
wDebug("vgId:%d, object is initialized", pWal->cfg.vgId);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void walFreeObj(void *wal) {
|
static void walFreeObj(void *wal) {
|
||||||
SWal *pWal = wal;
|
SWal *pWal = wal;
|
||||||
wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal);
|
wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal);
|
||||||
|
|
||||||
tfClose(pWal->writeLogTfd);
|
|
||||||
tfClose(pWal->writeIdxTfd);
|
|
||||||
taosArrayDestroy(pWal->fileInfoSet);
|
|
||||||
pWal->fileInfoSet = NULL;
|
|
||||||
pthread_mutex_destroy(&pWal->mutex);
|
pthread_mutex_destroy(&pWal->mutex);
|
||||||
tfree(pWal);
|
tfree(pWal);
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,7 +54,7 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
|
||||||
int64_t logTfd = pRead->readLogTfd;
|
int64_t logTfd = pRead->readLogTfd;
|
||||||
|
|
||||||
//seek position
|
//seek position
|
||||||
int64_t offset = (ver - fileFirstVer) * WAL_IDX_ENTRY_SIZE;
|
int64_t offset = (ver - fileFirstVer) * sizeof(WalIdxEntry);
|
||||||
code = tfLseek(idxTfd, offset, SEEK_SET);
|
code = tfLseek(idxTfd, offset, SEEK_SET);
|
||||||
if(code < 0) {
|
if(code < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -210,6 +210,6 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {
|
/*int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {*/
|
||||||
return 0;
|
/*return 0;*/
|
||||||
}
|
/*}*/
|
||||||
|
|
|
@ -27,7 +27,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
|
||||||
int64_t logTfd = pWal->writeLogTfd;
|
int64_t logTfd = pWal->writeLogTfd;
|
||||||
|
|
||||||
//seek position
|
//seek position
|
||||||
int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE;
|
int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * sizeof(WalIdxEntry);
|
||||||
code = tfLseek(idxTfd, offset, SEEK_SET);
|
code = tfLseek(idxTfd, offset, SEEK_SET);
|
||||||
if(code != 0) {
|
if(code != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -66,8 +66,6 @@ int walChangeFileToLast(SWal *pWal) {
|
||||||
//switch file
|
//switch file
|
||||||
pWal->writeIdxTfd = idxTfd;
|
pWal->writeIdxTfd = idxTfd;
|
||||||
pWal->writeLogTfd = logTfd;
|
pWal->writeLogTfd = logTfd;
|
||||||
//change status
|
|
||||||
pWal->curStatus = WAL_CUR_FILE_WRITABLE;
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,13 +91,11 @@ int walChangeFile(SWal *pWal, int64_t ver) {
|
||||||
int64_t fileFirstVer = pRet->firstVer;
|
int64_t fileFirstVer = pRet->firstVer;
|
||||||
//closed
|
//closed
|
||||||
if(taosArrayGetLast(pWal->fileInfoSet) != pRet) {
|
if(taosArrayGetLast(pWal->fileInfoSet) != pRet) {
|
||||||
pWal->curStatus &= ~WAL_CUR_FILE_WRITABLE;
|
|
||||||
walBuildIdxName(pWal, fileFirstVer, fnameStr);
|
walBuildIdxName(pWal, fileFirstVer, fnameStr);
|
||||||
idxTfd = tfOpenRead(fnameStr);
|
idxTfd = tfOpenRead(fnameStr);
|
||||||
walBuildLogName(pWal, fileFirstVer, fnameStr);
|
walBuildLogName(pWal, fileFirstVer, fnameStr);
|
||||||
logTfd = tfOpenRead(fnameStr);
|
logTfd = tfOpenRead(fnameStr);
|
||||||
} else {
|
} else {
|
||||||
pWal->curStatus |= WAL_CUR_FILE_WRITABLE;
|
|
||||||
walBuildIdxName(pWal, fileFirstVer, fnameStr);
|
walBuildIdxName(pWal, fileFirstVer, fnameStr);
|
||||||
idxTfd = tfOpenReadWrite(fnameStr);
|
idxTfd = tfOpenReadWrite(fnameStr);
|
||||||
walBuildLogName(pWal, fileFirstVer, fnameStr);
|
walBuildLogName(pWal, fileFirstVer, fnameStr);
|
||||||
|
|
|
@ -1,120 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
|
||||||
#include "os.h"
|
|
||||||
#include "walInt.h"
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) {
|
|
||||||
int64_t curFileId = *nextFileId;
|
|
||||||
int64_t minFileId = INT64_MAX;
|
|
||||||
|
|
||||||
DIR *dir = opendir(pWal->path);
|
|
||||||
if (dir == NULL) {
|
|
||||||
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct dirent *ent;
|
|
||||||
while ((ent = readdir(dir)) != NULL) {
|
|
||||||
char *name = ent->d_name;
|
|
||||||
|
|
||||||
if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
|
|
||||||
int64_t id = atoll(name + WAL_PREFIX_LEN);
|
|
||||||
if (id <= curFileId) continue;
|
|
||||||
|
|
||||||
if (id < minFileId) {
|
|
||||||
minFileId = id;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
closedir(dir);
|
|
||||||
|
|
||||||
if (minFileId == INT64_MAX) return -1;
|
|
||||||
|
|
||||||
*nextFileId = minFileId;
|
|
||||||
wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " nextFileId:%" PRId64, pWal->vgId, pWal->path, curFileId, *nextFileId);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId) {
|
|
||||||
int64_t minFileId = INT64_MAX;
|
|
||||||
|
|
||||||
DIR *dir = opendir(pWal->path);
|
|
||||||
if (dir == NULL) {
|
|
||||||
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct dirent *ent;
|
|
||||||
while ((ent = readdir(dir)) != NULL) {
|
|
||||||
char *name = ent->d_name;
|
|
||||||
|
|
||||||
if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
|
|
||||||
int64_t id = atoll(name + WAL_PREFIX_LEN);
|
|
||||||
if (id >= curFileId) continue;
|
|
||||||
|
|
||||||
minDiff--;
|
|
||||||
if (id < minFileId) {
|
|
||||||
minFileId = id;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
closedir(dir);
|
|
||||||
|
|
||||||
if (minFileId == INT64_MAX) return -1;
|
|
||||||
if (minDiff > 0) return -1;
|
|
||||||
|
|
||||||
*oldFileId = minFileId;
|
|
||||||
wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " oldFildId:%" PRId64, pWal->vgId, pWal->path, curFileId, *oldFileId);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) {
|
|
||||||
int64_t maxFileId = INT64_MIN;
|
|
||||||
|
|
||||||
DIR *dir = opendir(pWal->path);
|
|
||||||
if (dir == NULL) {
|
|
||||||
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct dirent *ent;
|
|
||||||
while ((ent = readdir(dir)) != NULL) {
|
|
||||||
char *name = ent->d_name;
|
|
||||||
|
|
||||||
if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
|
|
||||||
int64_t id = atoll(name + WAL_PREFIX_LEN);
|
|
||||||
if (id > maxFileId) {
|
|
||||||
maxFileId = id;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
closedir(dir);
|
|
||||||
|
|
||||||
if (maxFileId == INT64_MIN) {
|
|
||||||
*newFileId = 0;
|
|
||||||
} else {
|
|
||||||
*newFileId = maxFileId;
|
|
||||||
}
|
|
||||||
|
|
||||||
wTrace("vgId:%d, path:%s, newFileId:%" PRId64, pWal->vgId, pWal->path, *newFileId);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
|
@ -21,98 +21,6 @@
|
||||||
#include "tfile.h"
|
#include "tfile.h"
|
||||||
#include "walInt.h"
|
#include "walInt.h"
|
||||||
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId);
|
|
||||||
|
|
||||||
int32_t walRenew(void *handle) {
|
|
||||||
if (handle == NULL) return 0;
|
|
||||||
|
|
||||||
SWal * pWal = handle;
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
/*if (pWal->stop) {*/
|
|
||||||
/*wDebug("vgId:%d, do not create a new wal file", pWal->vgId);*/
|
|
||||||
/*return 0;*/
|
|
||||||
/*}*/
|
|
||||||
|
|
||||||
pthread_mutex_lock(&pWal->mutex);
|
|
||||||
|
|
||||||
if (tfValid(pWal->logTfd)) {
|
|
||||||
tfClose(pWal->logTfd);
|
|
||||||
wDebug("vgId:%d, file:%s, it is closed while renew", pWal->vgId, pWal->logName);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*if (pWal->keep == TAOS_WAL_KEEP) {*/
|
|
||||||
/*pWal->fileId = 0;*/
|
|
||||||
/*} else {*/
|
|
||||||
/*if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0;*/
|
|
||||||
/*pWal->fileId++;*/
|
|
||||||
/*}*/
|
|
||||||
|
|
||||||
snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId);
|
|
||||||
pWal->logTfd = tfOpenCreateWrite(pWal->logName);
|
|
||||||
|
|
||||||
if (!tfValid(pWal->logTfd)) {
|
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno));
|
|
||||||
} else {
|
|
||||||
wDebug("vgId:%d, file:%s, it is created and open while renew", pWal->vgId, pWal->logName);
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&pWal->mutex);
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
void walRemoveOneOldFile(void *handle) {
|
|
||||||
SWal *pWal = handle;
|
|
||||||
if (pWal == NULL) return;
|
|
||||||
/*if (pWal->keep == TAOS_WAL_KEEP) return;*/
|
|
||||||
if (!tfValid(pWal->logTfd)) return;
|
|
||||||
|
|
||||||
pthread_mutex_lock(&pWal->mutex);
|
|
||||||
|
|
||||||
// remove the oldest wal file
|
|
||||||
int64_t oldFileId = -1;
|
|
||||||
if (walGetOldFile(pWal, pWal->curFileId, WAL_FILE_NUM, &oldFileId) == 0) {
|
|
||||||
char walName[WAL_FILE_LEN] = {0};
|
|
||||||
snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId);
|
|
||||||
|
|
||||||
if (remove(walName) < 0) {
|
|
||||||
wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno));
|
|
||||||
} else {
|
|
||||||
wInfo("vgId:%d, file:%s, it is removed", pWal->vgId, walName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&pWal->mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
void walRemoveAllOldFiles(void *handle) {
|
|
||||||
if (handle == NULL) return;
|
|
||||||
|
|
||||||
SWal * pWal = handle;
|
|
||||||
int64_t fileId = -1;
|
|
||||||
|
|
||||||
pthread_mutex_lock(&pWal->mutex);
|
|
||||||
|
|
||||||
tfClose(pWal->logTfd);
|
|
||||||
wDebug("vgId:%d, file:%s, it is closed before remove all wals", pWal->vgId, pWal->logName);
|
|
||||||
|
|
||||||
while (walGetNextFile(pWal, &fileId) >= 0) {
|
|
||||||
snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
|
|
||||||
|
|
||||||
if (remove(pWal->logName) < 0) {
|
|
||||||
wError("vgId:%d, wal:%p file:%s, failed to remove since %s", pWal->vgId, pWal, pWal->logName, strerror(errno));
|
|
||||||
} else {
|
|
||||||
wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->logName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pthread_mutex_unlock(&pWal->mutex);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t walCommit(SWal *pWal, int64_t ver) {
|
int32_t walCommit(SWal *pWal, int64_t ver) {
|
||||||
ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
|
ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
|
||||||
ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
|
ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
|
||||||
|
@ -166,7 +74,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||||
pthread_mutex_unlock(&pWal->mutex);
|
pthread_mutex_unlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
int idxOff = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE;
|
int idxOff = (ver - walGetCurFileFirstVer(pWal)) * sizeof(WalIdxEntry);
|
||||||
code = tfLseek(idxTfd, idxOff, SEEK_SET);
|
code = tfLseek(idxTfd, idxOff, SEEK_SET);
|
||||||
if(code < 0) {
|
if(code < 0) {
|
||||||
pthread_mutex_unlock(&pWal->mutex);
|
pthread_mutex_unlock(&pWal->mutex);
|
||||||
|
@ -229,7 +137,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) {
|
int32_t walBeginSnapshot(SWal* pWal, int64_t ver) {
|
||||||
pWal->vers.verInSnapshotting = ver;
|
pWal->vers.verInSnapshotting = ver;
|
||||||
//check file rolling
|
//check file rolling
|
||||||
if(pWal->cfg.retentionPeriod == 0) {
|
if(pWal->cfg.retentionPeriod == 0) {
|
||||||
|
@ -239,7 +147,7 @@ int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walEndTakeSnapshot(SWal *pWal) {
|
int32_t walEndSnapshot(SWal *pWal) {
|
||||||
int64_t ver = pWal->vers.verInSnapshotting;
|
int64_t ver = pWal->vers.verInSnapshotting;
|
||||||
if(ver == -1) return -1;
|
if(ver == -1) return -1;
|
||||||
|
|
||||||
|
@ -287,7 +195,7 @@ int32_t walEndTakeSnapshot(SWal *pWal) {
|
||||||
pWal->vers.verInSnapshotting = -1;
|
pWal->vers.verInSnapshotting = -1;
|
||||||
|
|
||||||
//save snapshot ver, commit ver
|
//save snapshot ver, commit ver
|
||||||
int code = walWriteMeta(pWal);
|
int code = walSaveMeta(pWal);
|
||||||
if(code != 0) {
|
if(code != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -314,13 +222,13 @@ int walRoll(SWal *pWal) {
|
||||||
int64_t newFileFirstVersion = pWal->vers.lastVer + 1;
|
int64_t newFileFirstVersion = pWal->vers.lastVer + 1;
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN];
|
||||||
walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
|
walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
|
||||||
idxTfd = tfOpenCreateWrite(fnameStr);
|
idxTfd = tfOpenCreateWriteAppend(fnameStr);
|
||||||
if(idxTfd < 0) {
|
if(idxTfd < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
walBuildLogName(pWal, newFileFirstVersion, fnameStr);
|
walBuildLogName(pWal, newFileFirstVersion, fnameStr);
|
||||||
logTfd = tfOpenCreateWrite(fnameStr);
|
logTfd = tfOpenCreateWriteAppend(fnameStr);
|
||||||
if(logTfd < 0) {
|
if(logTfd < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -335,8 +243,6 @@ int walRoll(SWal *pWal) {
|
||||||
pWal->writeIdxTfd = idxTfd;
|
pWal->writeIdxTfd = idxTfd;
|
||||||
pWal->writeLogTfd = logTfd;
|
pWal->writeLogTfd = logTfd;
|
||||||
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
||||||
//change status
|
|
||||||
pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE;
|
|
||||||
|
|
||||||
pWal->lastRollSeq = walGetSeq();
|
pWal->lastRollSeq = walGetSeq();
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -425,74 +331,6 @@ void walFsync(SWal *pWal, bool forceFsync) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
|
|
||||||
if (handle == NULL) return -1;
|
|
||||||
|
|
||||||
SWal * pWal = handle;
|
|
||||||
int32_t count = 0;
|
|
||||||
int32_t code = 0;
|
|
||||||
int64_t fileId = -1;
|
|
||||||
|
|
||||||
while ((code = walGetNextFile(pWal, &fileId)) >= 0) {
|
|
||||||
/*if (fileId == pWal->curFileId) continue;*/
|
|
||||||
|
|
||||||
char walName[WAL_FILE_LEN];
|
|
||||||
snprintf(walName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
|
|
||||||
|
|
||||||
wInfo("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
|
|
||||||
code = walRestoreWalFile(pWal, pVnode, writeFp, walName, fileId);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
wError("vgId:%d, file:%s, failed to restore since %s", pWal->vgId, walName, tstrerror(code));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
wInfo("vgId:%d, file:%s, restore success, wver:%" PRIu64, pWal->vgId, walName, pWal->curVersion);
|
|
||||||
|
|
||||||
count++;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*if (pWal->keep != TAOS_WAL_KEEP) return TSDB_CODE_SUCCESS;*/
|
|
||||||
|
|
||||||
if (count == 0) {
|
|
||||||
wDebug("vgId:%d, wal file not exist, renew it", pWal->vgId);
|
|
||||||
return walRenew(pWal);
|
|
||||||
} else {
|
|
||||||
// open the existing WAL file in append mode
|
|
||||||
/*pWal->curFileId = 0;*/
|
|
||||||
snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId);
|
|
||||||
pWal->logTfd = tfOpenCreateWriteAppend(pWal->logName);
|
|
||||||
if (!tfValid(pWal->logTfd)) {
|
|
||||||
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno));
|
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
|
||||||
}
|
|
||||||
wDebug("vgId:%d, file:%s, it is created and open while restore", pWal->vgId, pWal->logName);
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
|
|
||||||
if (handle == NULL) return -1;
|
|
||||||
SWal *pWal = handle;
|
|
||||||
|
|
||||||
if (*fileId == 0) *fileId = -1;
|
|
||||||
|
|
||||||
pthread_mutex_lock(&(pWal->mutex));
|
|
||||||
|
|
||||||
int32_t code = walGetNextFile(pWal, fileId);
|
|
||||||
if (code >= 0) {
|
|
||||||
sprintf(fileName, "wal/%s%" PRId64, WAL_PREFIX, *fileId);
|
|
||||||
/*code = (*fileId == pWal->curFileId) ? 0 : 1;*/
|
|
||||||
}
|
|
||||||
|
|
||||||
wDebug("vgId:%d, get wal file, code:%d curId:%" PRId64 " outId:%" PRId64, pWal->vgId, code, pWal->curFileId, *fileId);
|
|
||||||
pthread_mutex_unlock(&(pWal->mutex));
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/*static int walValidateOffset(SWal* pWal, int64_t ver) {*/
|
/*static int walValidateOffset(SWal* pWal, int64_t ver) {*/
|
||||||
/*int code = 0;*/
|
/*int code = 0;*/
|
||||||
/*SWalHead *pHead = NULL;*/
|
/*SWalHead *pHead = NULL;*/
|
||||||
|
@ -516,139 +354,3 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
|
||||||
|
|
||||||
/*return 0;*/
|
/*return 0;*/
|
||||||
/*}*/
|
/*}*/
|
||||||
|
|
||||||
#if 0
|
|
||||||
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) {
|
|
||||||
int64_t pos = *offset;
|
|
||||||
while (1) {
|
|
||||||
pos++;
|
|
||||||
|
|
||||||
if (tfLseek(tfd, pos, SEEK_SET) < 0) {
|
|
||||||
wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno));
|
|
||||||
return TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tfRead(tfd, pHead, sizeof(SWalHead)) <= 0) {
|
|
||||||
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
|
|
||||||
return TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pHead->signature != WAL_SIGNATURE) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pHead->sver >= 1) {
|
|
||||||
if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) {
|
|
||||||
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
|
|
||||||
return TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (walValidateChecksum(pHead)) {
|
|
||||||
wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos);
|
|
||||||
*offset = pos;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) {
|
|
||||||
int32_t size = WAL_MAX_SIZE;
|
|
||||||
void * buffer = malloc(size);
|
|
||||||
if (buffer == NULL) {
|
|
||||||
wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
|
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t tfd = tfOpenReadWrite(name);
|
|
||||||
if (!tfValid(tfd)) {
|
|
||||||
wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
|
|
||||||
tfree(buffer);
|
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
|
||||||
} else {
|
|
||||||
wDebug("vgId:%d, file:%s, open for restore", pWal->vgId, name);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
int64_t offset = 0;
|
|
||||||
SWalHead *pHead = buffer;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
int32_t ret = (int32_t)tfRead(tfd, pHead, sizeof(SWalHead));
|
|
||||||
if (ret == 0) break;
|
|
||||||
|
|
||||||
if (ret < 0) {
|
|
||||||
wError("vgId:%d, file:%s, failed to read wal head since %s", pWal->vgId, name, strerror(errno));
|
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ret < sizeof(SWalHead)) {
|
|
||||||
wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret);
|
|
||||||
walFtruncate(pWal, tfd, offset);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 2) {
|
|
||||||
wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
|
|
||||||
pHead->version, pHead->len, offset);
|
|
||||||
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
walFtruncate(pWal, tfd, offset);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) {
|
|
||||||
wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
|
|
||||||
pHead->version, pHead->len, offset);
|
|
||||||
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
walFtruncate(pWal, tfd, offset);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len);
|
|
||||||
if (ret < 0) {
|
|
||||||
wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
|
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ret < pHead->len) {
|
|
||||||
wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len);
|
|
||||||
offset += sizeof(SWalHead);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((pHead->sver >= 1) && !walValidateChecksum(pHead)) {
|
|
||||||
wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
|
|
||||||
pHead->version, pHead->len, offset);
|
|
||||||
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
walFtruncate(pWal, tfd, offset);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
offset = offset + sizeof(SWalHead) + pHead->len;
|
|
||||||
|
|
||||||
wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64,
|
|
||||||
pWal->vgId, fileId, pHead->version, pWal->curVersion, pHead->len, offset);
|
|
||||||
|
|
||||||
pWal->curVersion = pHead->version;
|
|
||||||
|
|
||||||
// wInfo("writeFp: %ld", offset);
|
|
||||||
(*writeFp)(pVnode, pHead);
|
|
||||||
}
|
|
||||||
|
|
||||||
tfClose(tfd);
|
|
||||||
tfree(buffer);
|
|
||||||
|
|
||||||
wDebug("vgId:%d, file:%s, it is closed after restore", pWal->vgId, name);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
|
@ -142,7 +142,7 @@ TEST_F(WalCleanEnv, serialize) {
|
||||||
char*ss = walMetaSerialize(pWal);
|
char*ss = walMetaSerialize(pWal);
|
||||||
printf("%s\n", ss);
|
printf("%s\n", ss);
|
||||||
free(ss);
|
free(ss);
|
||||||
code = walWriteMeta(pWal);
|
code = walSaveMeta(pWal);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,11 +150,11 @@ TEST_F(WalCleanEnv, removeOldMeta) {
|
||||||
int code = walRollFileInfo(pWal);
|
int code = walRollFileInfo(pWal);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
ASSERT(pWal->fileInfoSet != NULL);
|
ASSERT(pWal->fileInfoSet != NULL);
|
||||||
code = walWriteMeta(pWal);
|
code = walSaveMeta(pWal);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
code = walRollFileInfo(pWal);
|
code = walRollFileInfo(pWal);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
code = walWriteMeta(pWal);
|
code = walSaveMeta(pWal);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -199,7 +199,7 @@ TEST_F(WalCleanEnv, write) {
|
||||||
ASSERT_EQ(code, -1);
|
ASSERT_EQ(code, -1);
|
||||||
ASSERT_EQ(pWal->vers.lastVer, i);
|
ASSERT_EQ(pWal->vers.lastVer, i);
|
||||||
}
|
}
|
||||||
code = walWriteMeta(pWal);
|
code = walSaveMeta(pWal);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -216,7 +216,7 @@ TEST_F(WalCleanEnv, rollback) {
|
||||||
code = walRollback(pWal, 3);
|
code = walRollback(pWal, 3);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(pWal->vers.lastVer, 2);
|
ASSERT_EQ(pWal->vers.lastVer, 2);
|
||||||
code = walWriteMeta(pWal);
|
code = walSaveMeta(pWal);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,9 +231,9 @@ TEST_F(WalCleanDeleteEnv, roll) {
|
||||||
ASSERT_EQ(pWal->vers.commitVer, i);
|
ASSERT_EQ(pWal->vers.commitVer, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
walBeginTakeSnapshot(pWal, i-1);
|
walBeginSnapshot(pWal, i-1);
|
||||||
ASSERT_EQ(pWal->vers.verInSnapshotting, i-1);
|
ASSERT_EQ(pWal->vers.verInSnapshotting, i-1);
|
||||||
walEndTakeSnapshot(pWal);
|
walEndSnapshot(pWal);
|
||||||
ASSERT_EQ(pWal->vers.snapshotVer, i-1);
|
ASSERT_EQ(pWal->vers.snapshotVer, i-1);
|
||||||
ASSERT_EQ(pWal->vers.verInSnapshotting, -1);
|
ASSERT_EQ(pWal->vers.verInSnapshotting, -1);
|
||||||
|
|
||||||
|
@ -247,9 +247,9 @@ TEST_F(WalCleanDeleteEnv, roll) {
|
||||||
ASSERT_EQ(pWal->vers.commitVer, i);
|
ASSERT_EQ(pWal->vers.commitVer, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = walBeginTakeSnapshot(pWal, i - 1);
|
code = walBeginSnapshot(pWal, i - 1);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
code = walEndTakeSnapshot(pWal);
|
code = walEndSnapshot(pWal);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue