refactor wal
This commit is contained in:
parent
e123467221
commit
6de9692b7d
|
@ -39,6 +39,14 @@ typedef enum {
|
|||
} EWalType;
|
||||
|
||||
typedef struct {
|
||||
//union {
|
||||
//uint32_t info;
|
||||
//struct {
|
||||
//uint32_t sver:3;
|
||||
//uint32_t msgtype: 5;
|
||||
//uint32_t reserved : 24;
|
||||
//};
|
||||
//};
|
||||
int8_t sver;
|
||||
uint8_t msgType;
|
||||
int8_t reserved[2];
|
||||
|
@ -71,13 +79,17 @@ typedef struct {
|
|||
#define WAL_FILESET_MAX 128
|
||||
|
||||
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
|
||||
#define WAL_CUR_POS_READ_ONLY 1
|
||||
#define WAL_CUR_FILE_READ_ONLY 2
|
||||
#define WAL_CUR_POS_WRITABLE 1
|
||||
#define WAL_CUR_FILE_WRITABLE 2
|
||||
#define WAL_CUR_FAILED 4
|
||||
|
||||
typedef struct SWal {
|
||||
// cfg
|
||||
int32_t vgId;
|
||||
int32_t fsyncPeriod; // millisecond
|
||||
int32_t fsyncSeq;
|
||||
int32_t rollPeriod; // second
|
||||
int64_t segSize;
|
||||
EWalType level;
|
||||
//reference
|
||||
int64_t refId;
|
||||
|
@ -86,7 +98,7 @@ typedef struct SWal {
|
|||
int64_t curIdxTfd;
|
||||
//current version
|
||||
int64_t curVersion;
|
||||
int64_t curOffset;
|
||||
int64_t curLogOffset;
|
||||
//current file version
|
||||
int64_t curFileFirstVersion;
|
||||
int64_t curFileLastVersion;
|
||||
|
@ -94,8 +106,10 @@ typedef struct SWal {
|
|||
int64_t firstVersion;
|
||||
int64_t snapshotVersion;
|
||||
int64_t lastVersion;
|
||||
//fsync status
|
||||
int32_t fsyncSeq;
|
||||
int64_t lastFileName;
|
||||
//roll status
|
||||
int64_t lastRollSeq;
|
||||
int64_t lastFileWriteSize;
|
||||
//ctl
|
||||
int32_t curStatus;
|
||||
pthread_mutex_t mutex;
|
||||
|
@ -119,12 +133,10 @@ int32_t walAlter(SWal *, SWalCfg *pCfg);
|
|||
void walClose(SWal *);
|
||||
|
||||
// write
|
||||
//int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen);
|
||||
int64_t walWrite(SWal *, int64_t index, uint8_t msgType, void *body, int32_t bodyLen);
|
||||
//int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize);
|
||||
void walFsync(SWal *, bool force);
|
||||
|
||||
// apis for lifecycle management
|
||||
void walFsync(SWal *, bool force);
|
||||
int32_t walCommit(SWal *, int64_t ver);
|
||||
// truncate after
|
||||
int32_t walRollback(SWal *, int64_t ver);
|
||||
|
|
|
@ -28,6 +28,7 @@ void tfCleanup();
|
|||
|
||||
// the same syntax as UNIX standard open/close/read/write
|
||||
// but FD is int64_t and will never be reused
|
||||
int64_t tfOpenRead(const char *pathname);
|
||||
int64_t tfOpenReadWrite(const char *pathname);
|
||||
int64_t tfOpenCreateWrite(const char *pathname);
|
||||
int64_t tfOpenCreateWriteAppend(const char *pathname);
|
||||
|
|
|
@ -17,14 +17,16 @@
|
|||
#define _TD_WAL_INT_H_
|
||||
|
||||
#include "wal.h"
|
||||
#include "compare.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
int walRotate(SWal* pWal);
|
||||
int walGetFile(SWal* pWal, int32_t version);
|
||||
|
||||
int64_t walGetSeq();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -20,9 +20,38 @@
|
|||
#include "tfile.h"
|
||||
#include "walInt.h"
|
||||
|
||||
int walSeekVerImpl(SWal *pWal, int64_t ver) {
|
||||
//close old file
|
||||
static int walSeekFilePos(SWal* pWal, int64_t ver) {
|
||||
int code = 0;
|
||||
|
||||
int64_t idxTfd = pWal->curIdxTfd;
|
||||
int64_t logTfd = pWal->curLogTfd;
|
||||
|
||||
//seek position
|
||||
int64_t offset = (ver - pWal->curFileFirstVersion) * WAL_IDX_ENTRY_SIZE;
|
||||
code = tfLseek(idxTfd, offset, SEEK_SET);
|
||||
if(code != 0) {
|
||||
|
||||
}
|
||||
int64_t readBuf[2];
|
||||
code = tfRead(idxTfd, readBuf, sizeof(readBuf));
|
||||
if(code != 0) {
|
||||
|
||||
}
|
||||
//TODO:deserialize
|
||||
ASSERT(readBuf[0] == ver);
|
||||
code = tfLseek(logTfd, readBuf[1], SEEK_CUR);
|
||||
if (code != 0) {
|
||||
|
||||
}
|
||||
pWal->curLogOffset = readBuf[1];
|
||||
pWal->curVersion = ver;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int walChangeFile(SWal *pWal, int64_t ver) {
|
||||
int code = 0;
|
||||
int64_t idxTfd, logTfd;
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
code = tfClose(pWal->curLogTfd);
|
||||
if(code != 0) {
|
||||
//TODO
|
||||
|
@ -32,29 +61,36 @@ int walSeekVerImpl(SWal *pWal, int64_t ver) {
|
|||
//TODO
|
||||
}
|
||||
//bsearch in fileSet
|
||||
int fName = 0;//TODO
|
||||
//open the right file
|
||||
char fNameStr[WAL_FILE_LEN];
|
||||
sprintf(fNameStr, "%d."WAL_INDEX_SUFFIX, fName);
|
||||
bool closed = 1; //TODO:read only
|
||||
int64_t idxTfd = tfOpenReadWrite(fNameStr);
|
||||
sprintf(fNameStr, "%d."WAL_LOG_SUFFIX, fName);
|
||||
int64_t logTfd = tfOpenReadWrite(fNameStr);
|
||||
//seek position
|
||||
int64_t offset = (ver - fName) * WAL_IDX_ENTRY_SIZE;
|
||||
tfLseek(idxTfd, offset, SEEK_SET);
|
||||
//set cur version, cur file version and cur status
|
||||
pWal->curFileFirstVersion = fName;
|
||||
pWal->curFileLastVersion = 1;//TODO
|
||||
int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE);
|
||||
ASSERT(pRet != NULL);
|
||||
int64_t fname = *pRet;
|
||||
if(fname < pWal->lastFileName) {
|
||||
pWal->curStatus &= ~WAL_CUR_FILE_WRITABLE;
|
||||
pWal->curFileLastVersion = pRet[1]-1;
|
||||
sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname);
|
||||
idxTfd = tfOpenRead(fnameStr);
|
||||
sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname);
|
||||
logTfd = tfOpenRead(fnameStr);
|
||||
} else {
|
||||
pWal->curStatus |= WAL_CUR_FILE_WRITABLE;
|
||||
pWal->curFileLastVersion = -1;
|
||||
sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname);
|
||||
idxTfd = tfOpenReadWrite(fnameStr);
|
||||
sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname);
|
||||
logTfd = tfOpenReadWrite(fnameStr);
|
||||
}
|
||||
|
||||
pWal->curFileFirstVersion = fname;
|
||||
pWal->curLogTfd = logTfd;
|
||||
pWal->curIdxTfd = idxTfd;
|
||||
pWal->curVersion = ver;
|
||||
pWal->curOffset = offset;
|
||||
pWal->curStatus = 0;//TODO
|
||||
return code;
|
||||
}
|
||||
|
||||
int walSeekVer(SWal *pWal, int64_t ver) {
|
||||
if((!(pWal->curStatus & WAL_CUR_FAILED))
|
||||
&& ver == pWal->curVersion) {
|
||||
return 0;
|
||||
}
|
||||
if(ver > pWal->lastVersion) {
|
||||
//TODO: some records are skipped
|
||||
return -1;
|
||||
|
@ -64,54 +100,13 @@ int walSeekVer(SWal *pWal, int64_t ver) {
|
|||
return -1;
|
||||
}
|
||||
if(ver < pWal->snapshotVersion) {
|
||||
//TODO: seek snapshotted log
|
||||
//TODO: seek snapshotted log, invalid in some cases
|
||||
}
|
||||
if(ver >= pWal->curFileFirstVersion
|
||||
&& ((pWal->curFileLastVersion == -1 && ver <= pWal->lastVersion) || (ver <= pWal->curFileLastVersion))) {
|
||||
|
||||
}
|
||||
if(ver < pWal->curFileFirstVersion || (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) {
|
||||
int index = 0;
|
||||
index = 1;
|
||||
//back up to avoid inconsistency
|
||||
int64_t curVersion = pWal->curVersion;
|
||||
int64_t curOffset = pWal->curOffset;
|
||||
int64_t curFileFirstVersion = pWal->curFileFirstVersion;
|
||||
int64_t curFileLastVersion = pWal->curFileLastVersion;
|
||||
if(walSeekVerImpl(pWal, ver) < 0) {
|
||||
//TODO: errno
|
||||
pWal->curVersion = curVersion;
|
||||
pWal->curOffset = curOffset;
|
||||
pWal->curFileFirstVersion = curFileFirstVersion;
|
||||
pWal->curFileLastVersion = curFileLastVersion;
|
||||
return -1;
|
||||
}
|
||||
if(ver < pWal->curFileFirstVersion ||
|
||||
(pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) {
|
||||
walChangeFile(pWal, ver);
|
||||
}
|
||||
walSeekFilePos(pWal, ver);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
||||
int code = 0;
|
||||
//get index file
|
||||
if(!tfValid(pWal->curIdxTfd)) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
|
||||
}
|
||||
if(pWal->curVersion != ver) {
|
||||
if(walSeekVer(pWal, ver) != 0) {
|
||||
//TODO: some records are skipped
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
//check file checksum
|
||||
//append index
|
||||
return 0;
|
||||
}
|
||||
|
||||
int walRotateIndex(SWal *pWal) {
|
||||
//check file checksum
|
||||
//create new file
|
||||
//switch file
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -26,57 +26,44 @@ int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
|
|||
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
|
||||
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId);
|
||||
|
||||
static pthread_mutex_t walInitLock = PTHREAD_MUTEX_INITIALIZER;
|
||||
static int8_t walInited = 0;
|
||||
|
||||
typedef struct {
|
||||
int32_t refSetId;
|
||||
int32_t seq;
|
||||
uint32_t seq;
|
||||
int8_t stop;
|
||||
int8_t inited;
|
||||
pthread_t thread;
|
||||
pthread_mutex_t mutex;
|
||||
} SWalMgmt;
|
||||
|
||||
static SWalMgmt tsWal = {0};
|
||||
static SWalMgmt tsWal = {0, .seq = 1};
|
||||
static int32_t walCreateThread();
|
||||
static void walStopThread();
|
||||
static int32_t walInitObj(SWal *pWal);
|
||||
static void walFreeObj(void *pWal);
|
||||
|
||||
int32_t walInit() {
|
||||
//TODO: change to atomic
|
||||
pthread_mutex_lock(&walInitLock);
|
||||
if(walInited) {
|
||||
pthread_mutex_unlock(&walInitLock);
|
||||
return 0;
|
||||
} else {
|
||||
walInited = 1;
|
||||
pthread_mutex_unlock(&walInitLock);
|
||||
}
|
||||
int64_t walGetSeq() {
|
||||
return (int64_t)atomic_load_32(&tsWal.seq);
|
||||
}
|
||||
|
||||
int32_t walInit() {
|
||||
int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1);
|
||||
if(old == 1) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
|
||||
|
||||
code = pthread_mutex_init(&tsWal.mutex, NULL);
|
||||
if (code != 0) {
|
||||
wError("failed to init wal mutex since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
code = walCreateThread();
|
||||
int code = walCreateThread();
|
||||
if (code != 0) {
|
||||
wError("failed to init wal module since %s", tstrerror(code));
|
||||
atomic_store_8(&tsWal.inited, 0);
|
||||
return code;
|
||||
}
|
||||
|
||||
wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId);
|
||||
return code;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void walCleanUp() {
|
||||
walStopThread();
|
||||
taosCloseRef(tsWal.refSetId);
|
||||
pthread_mutex_destroy(&tsWal.mutex);
|
||||
wInfo("wal module is cleaned up");
|
||||
}
|
||||
|
||||
|
@ -92,7 +79,7 @@ static int walLoadFileset(SWal *pWal) {
|
|||
char *name = ent->d_name;
|
||||
name[WAL_NOSUFFIX_LEN] = 0;
|
||||
//validate file name by regex matching
|
||||
if(1 /* regex match */) {
|
||||
if(1 /* TODO:regex match */) {
|
||||
int64_t fnameInt64 = atoll(name);
|
||||
taosArrayPush(pWal->fileSet, &fnameInt64);
|
||||
}
|
||||
|
@ -133,6 +120,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
|||
walFreeObj(pWal);
|
||||
return NULL;
|
||||
}
|
||||
walLoadFileset(pWal);
|
||||
|
||||
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod);
|
||||
|
||||
|
@ -164,6 +152,9 @@ void walClose(SWal *pWal) {
|
|||
|
||||
pthread_mutex_lock(&pWal->mutex);
|
||||
tfClose(pWal->curLogTfd);
|
||||
tfClose(pWal->curIdxTfd);
|
||||
taosArrayDestroy(pWal->fileSet);
|
||||
pWal->fileSet = NULL;
|
||||
pthread_mutex_unlock(&pWal->mutex);
|
||||
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
||||
}
|
||||
|
@ -188,6 +179,9 @@ static void walFreeObj(void *wal) {
|
|||
wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal);
|
||||
|
||||
tfClose(pWal->curLogTfd);
|
||||
tfClose(pWal->curIdxTfd);
|
||||
taosArrayDestroy(pWal->fileSet);
|
||||
pWal->fileSet = NULL;
|
||||
pthread_mutex_destroy(&pWal->mutex);
|
||||
tfree(pWal);
|
||||
}
|
||||
|
@ -197,7 +191,7 @@ static bool walNeedFsync(SWal *pWal) {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (tsWal.seq % pWal->fsyncSeq == 0) {
|
||||
if (atomic_load_32(&tsWal.seq) % pWal->fsyncSeq == 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -206,16 +200,14 @@ static bool walNeedFsync(SWal *pWal) {
|
|||
|
||||
static void walUpdateSeq() {
|
||||
taosMsleep(WAL_REFRESH_MS);
|
||||
if (++tsWal.seq <= 0) {
|
||||
tsWal.seq = 1;
|
||||
}
|
||||
atomic_add_fetch_32(&tsWal.seq, 1);
|
||||
}
|
||||
|
||||
static void walFsyncAll() {
|
||||
SWal *pWal = taosIterateRef(tsWal.refSetId, 0);
|
||||
while (pWal) {
|
||||
if (walNeedFsync(pWal)) {
|
||||
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq);
|
||||
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq));
|
||||
int32_t code = tfFsync(pWal->curLogTfd);
|
||||
if (code != 0) {
|
||||
wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(code));
|
||||
|
@ -226,16 +218,12 @@ static void walFsyncAll() {
|
|||
}
|
||||
|
||||
static void *walThreadFunc(void *param) {
|
||||
int stop = 0;
|
||||
setThreadName("wal");
|
||||
while (1) {
|
||||
walUpdateSeq();
|
||||
walFsyncAll();
|
||||
|
||||
pthread_mutex_lock(&tsWal.mutex);
|
||||
stop = tsWal.stop;
|
||||
pthread_mutex_unlock(&tsWal.mutex);
|
||||
if (stop) break;
|
||||
if (atomic_load_8(&tsWal.stop)) break;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
@ -258,9 +246,7 @@ static int32_t walCreateThread() {
|
|||
}
|
||||
|
||||
static void walStopThread() {
|
||||
pthread_mutex_lock(&tsWal.mutex);
|
||||
tsWal.stop = 1;
|
||||
pthread_mutex_unlock(&tsWal.mutex);
|
||||
atomic_store_8(&tsWal.stop, 1);
|
||||
|
||||
if (taosCheckPthreadValid(tsWal.thread)) {
|
||||
pthread_join(tsWal.thread, NULL);
|
||||
|
|
|
@ -26,10 +26,24 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
|
|||
}
|
||||
|
||||
int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||
//TODO: ftruncate
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t walTakeSnapshot(SWal *pWal, int64_t ver) {
|
||||
pWal->snapshotVersion = ver;
|
||||
|
||||
//mark files safe to delete
|
||||
int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE);
|
||||
if(pRet != pWal->fileSet->pData) {
|
||||
//delete files until less than retention size
|
||||
|
||||
//find first file that exceeds retention time
|
||||
|
||||
}
|
||||
|
||||
//delete files living longer than retention limit
|
||||
//remove file from fileset
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -124,13 +138,102 @@ void walRemoveAllOldFiles(void *handle) {
|
|||
}
|
||||
#endif
|
||||
|
||||
static int walRoll(SWal *pWal) {
|
||||
int code = 0;
|
||||
code = tfClose(pWal->curIdxTfd);
|
||||
if(code != 0) {
|
||||
return code;
|
||||
}
|
||||
code = tfClose(pWal->curLogTfd);
|
||||
if(code != 0) {
|
||||
return code;
|
||||
}
|
||||
int64_t idxTfd, logTfd;
|
||||
//create new file
|
||||
int64_t newFileFirstVersion = pWal->lastVersion + 1;
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, newFileFirstVersion);
|
||||
idxTfd = tfOpenCreateWrite(fnameStr);
|
||||
sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, newFileFirstVersion);
|
||||
logTfd = tfOpenCreateWrite(fnameStr);
|
||||
|
||||
taosArrayPush(pWal->fileSet, &newFileFirstVersion);
|
||||
|
||||
//switch file
|
||||
pWal->curIdxTfd = idxTfd;
|
||||
pWal->curLogTfd = logTfd;
|
||||
//change status
|
||||
pWal->curFileLastVersion = -1;
|
||||
pWal->curFileFirstVersion = newFileFirstVersion;
|
||||
pWal->curVersion = newFileFirstVersion;
|
||||
pWal->curLogOffset = 0;
|
||||
pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE;
|
||||
|
||||
pWal->lastFileName = newFileFirstVersion;
|
||||
pWal->lastFileWriteSize = 0;
|
||||
pWal->lastRollSeq = walGetSeq();
|
||||
return 0;
|
||||
}
|
||||
|
||||
int walChangeFileToLast(SWal *pWal) {
|
||||
int64_t idxTfd, logTfd;
|
||||
int64_t* pRet = taosArrayGetLast(pWal->fileSet);
|
||||
ASSERT(pRet != NULL);
|
||||
int64_t fname = *pRet;
|
||||
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname);
|
||||
idxTfd = tfOpenReadWrite(fnameStr);
|
||||
sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname);
|
||||
logTfd = tfOpenReadWrite(fnameStr);
|
||||
//switch file
|
||||
pWal->curIdxTfd = idxTfd;
|
||||
pWal->curLogTfd = logTfd;
|
||||
//change status
|
||||
pWal->curFileLastVersion = -1;
|
||||
pWal->curFileFirstVersion = fname;
|
||||
pWal->curVersion = fname;
|
||||
pWal->curLogOffset = 0;
|
||||
pWal->curStatus = WAL_CUR_FILE_WRITABLE;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
||||
int code = 0;
|
||||
//get index file
|
||||
if(!tfValid(pWal->curIdxTfd)) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
|
||||
}
|
||||
int64_t writeBuf[2] = { ver, offset };
|
||||
int size = tfWrite(pWal->curIdxTfd, writeBuf, sizeof(writeBuf));
|
||||
if(size != sizeof(writeBuf)) {
|
||||
//TODO:
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t bodyLen) {
|
||||
if (pWal == NULL) return -1;
|
||||
|
||||
// no wal
|
||||
if (!tfValid(pWal->curLogTfd)) return 0;
|
||||
if (pWal->level == TAOS_WAL_NOLOG) return 0;
|
||||
if (index > pWal->lastVersion + 1) return -1;
|
||||
|
||||
if (index == pWal->lastVersion + 1) {
|
||||
int64_t passed = walGetSeq() - pWal->lastRollSeq;
|
||||
if(passed > pWal->rollPeriod) {
|
||||
walRoll(pWal);
|
||||
} else if(pWal->lastFileWriteSize > pWal->segSize) {
|
||||
walRoll(pWal);
|
||||
} else {
|
||||
walChangeFileToLast(pWal);
|
||||
}
|
||||
} else {
|
||||
//reject skip log or rewrite log
|
||||
//must truncate explicitly first
|
||||
return -1;
|
||||
}
|
||||
if (!tfValid(pWal->curLogTfd)) return 0;
|
||||
|
||||
pWal->head.version = index;
|
||||
int32_t code = 0;
|
||||
|
@ -155,7 +258,11 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t
|
|||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
|
||||
}
|
||||
//TODO:write idx
|
||||
walWriteIndex(pWal, index, pWal->curLogOffset);
|
||||
pWal->curLogOffset += sizeof(SWalHead) + bodyLen;
|
||||
|
||||
//set status
|
||||
pWal->lastVersion = index;
|
||||
|
||||
pthread_mutex_unlock(&pWal->mutex);
|
||||
|
||||
|
|
|
@ -53,6 +53,11 @@ static int64_t tfOpenImp(int32_t fd) {
|
|||
return rid;
|
||||
}
|
||||
|
||||
int64_t tfOpenRead(const char *pathname, int32_t flags) {
|
||||
int32_t fd = taosOpenFileRead(pathname);
|
||||
return tfOpenImp(fd);
|
||||
}
|
||||
|
||||
int64_t tfOpenReadWrite(const char *pathname, int32_t flags) {
|
||||
int32_t fd = taosOpenFileReadWrite(pathname);
|
||||
return tfOpenImp(fd);
|
||||
|
|
Loading…
Reference in New Issue