add wal implementation
This commit is contained in:
parent
081cc7cf6d
commit
044b02bb60
|
@ -16,11 +16,21 @@
|
||||||
#define _TD_WAL_H_
|
#define _TD_WAL_H_
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "tdef.h"
|
||||||
|
#include "tlog.h"
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
extern int32_t wDebugFlag;
|
||||||
|
|
||||||
|
#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", 255, __VA_ARGS__); }}
|
||||||
|
#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", 255, __VA_ARGS__); }}
|
||||||
|
#define wWarn(...) { if (wDebugFlag & DEBUG_WARN) { taosPrintLog("WAL WARN ", 255, __VA_ARGS__); }}
|
||||||
|
#define wInfo(...) { if (wDebugFlag & DEBUG_INFO) { taosPrintLog("WAL ", 255, __VA_ARGS__); }}
|
||||||
|
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
|
||||||
|
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TAOS_WAL_NOLOG = 0,
|
TAOS_WAL_NOLOG = 0,
|
||||||
TAOS_WAL_WRITE = 1,
|
TAOS_WAL_WRITE = 1,
|
||||||
|
@ -28,9 +38,8 @@ typedef enum {
|
||||||
} EWalType;
|
} EWalType;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t msgType;
|
int8_t sver;
|
||||||
int8_t sver; // sver 2 for WAL SDataRow/SMemRow compatibility
|
int8_t reserved[3];
|
||||||
int8_t reserved[2];
|
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int64_t version;
|
int64_t version;
|
||||||
uint32_t signature;
|
uint32_t signature;
|
||||||
|
@ -44,11 +53,33 @@ typedef struct {
|
||||||
EWalType walLevel; // wal level
|
EWalType walLevel; // wal level
|
||||||
} SWalCfg;
|
} SWalCfg;
|
||||||
|
|
||||||
|
#define WAL_PREFIX "wal"
|
||||||
|
#define WAL_PREFIX_LEN 3
|
||||||
|
#define WAL_REFRESH_MS 1000
|
||||||
|
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
|
||||||
|
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
|
||||||
|
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
||||||
|
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
||||||
|
#define WAL_FILE_NUM 1 // 3
|
||||||
|
|
||||||
typedef struct SWal {
|
typedef struct SWal {
|
||||||
int8_t unused;
|
int64_t version;
|
||||||
|
int64_t fileId;
|
||||||
|
int64_t rId;
|
||||||
|
int64_t tfd;
|
||||||
|
int32_t vgId;
|
||||||
|
int32_t keep;
|
||||||
|
int32_t level;
|
||||||
|
int32_t fsyncPeriod;
|
||||||
|
int32_t fsyncSeq;
|
||||||
|
int8_t stop;
|
||||||
|
int8_t reseved[3];
|
||||||
|
char path[WAL_PATH_LEN];
|
||||||
|
char name[WAL_FILE_LEN];
|
||||||
|
pthread_mutex_t mutex;
|
||||||
} SWal; // WAL HANDLE
|
} SWal; // WAL HANDLE
|
||||||
|
|
||||||
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
|
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, void *pMsg);
|
||||||
|
|
||||||
// module initialization
|
// module initialization
|
||||||
int32_t walInit();
|
int32_t walInit();
|
||||||
|
@ -82,6 +113,11 @@ int64_t walGetSnapshotVer(SWal *);
|
||||||
int64_t walGetLastVer(SWal *);
|
int64_t walGetLastVer(SWal *);
|
||||||
// int32_t walDataCorrupted(SWal*);
|
// int32_t walDataCorrupted(SWal*);
|
||||||
|
|
||||||
|
//internal
|
||||||
|
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
|
||||||
|
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
|
||||||
|
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -46,6 +46,7 @@ extern "C" {
|
||||||
#include <math.h>
|
#include <math.h>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
|
#include <dirent.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
#include "osAtomic.h"
|
#include "osAtomic.h"
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
#ifndef _TD_UTIL_LOG_H
|
#ifndef _TD_UTIL_LOG_H
|
||||||
#define _TD_UTIL_LOG_H
|
#define _TD_UTIL_LOG_H
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -53,7 +53,7 @@ void taosNotePrintBuffer(SNoteObj *pNote, char *buffer, int32_t len);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define nInfo(buffer, len) \
|
#define nInfo(buffer, len) \
|
||||||
if (tscEmbedded == 1) { \
|
if (tscEmbeddedInUtil == 1) { \
|
||||||
taosNotePrintBuffer(&tsInfoNote, buffer, len); \
|
taosNotePrintBuffer(&tsInfoNote, buffer, len); \
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,20 +20,21 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
|
||||||
extern int32_t uDebugFlag;
|
extern int32_t uDebugFlag;
|
||||||
extern int8_t tscEmbedded;
|
extern int8_t tscEmbeddedInUtil;
|
||||||
|
|
||||||
#define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }}
|
#define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }}
|
||||||
#define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }}
|
#define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }}
|
||||||
#define uWarn(...) { if (uDebugFlag & DEBUG_WARN) { taosPrintLog("UTL WARN ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }}
|
#define uWarn(...) { if (uDebugFlag & DEBUG_WARN) { taosPrintLog("UTL WARN ", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }}
|
||||||
#define uInfo(...) { if (uDebugFlag & DEBUG_INFO) { taosPrintLog("UTL ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }}
|
#define uInfo(...) { if (uDebugFlag & DEBUG_INFO) { taosPrintLog("UTL ", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }}
|
||||||
#define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }}
|
#define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }}
|
||||||
#define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }}
|
#define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }}
|
||||||
|
|
||||||
#define pError(...) { taosPrintLog("APP ERROR ", 255, __VA_ARGS__); }
|
#define pError(...) { taosPrintLog("APP ERROR ", 255, __VA_ARGS__); }
|
||||||
#define pPrint(...) { taosPrintLog("APP ", 255, __VA_ARGS__); }
|
#define pPrint(...) { taosPrintLog("APP ", 255, __VA_ARGS__); }
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,4 +9,5 @@ target_include_directories(
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
wal
|
wal
|
||||||
PUBLIC os
|
PUBLIC os
|
||||||
|
PUBLIC util
|
||||||
)
|
)
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
#ifndef _TD_WAL_INT_H_
|
#ifndef _TD_WAL_INT_H_
|
||||||
#define _TD_WAL_INT_H_
|
#define _TD_WAL_INT_H_
|
||||||
|
|
||||||
|
#include "wal.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
@ -24,4 +26,4 @@ extern "C" {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_WAL_INT_H_*/
|
#endif /*_TD_WAL_INT_H_*/
|
||||||
|
|
|
@ -15,40 +15,35 @@
|
||||||
|
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
|
||||||
int32_t walInit() { return 0; }
|
int32_t walCommit(SWal *pWal, int64_t ver) {
|
||||||
|
|
||||||
void walCleanUp() {}
|
|
||||||
|
|
||||||
SWal *walOpen(char *path, SWalCfg *pCfg) {
|
|
||||||
SWal* pWal = malloc(sizeof(SWal));
|
|
||||||
if(pWal == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
return pWal;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; }
|
|
||||||
|
|
||||||
void walClose(SWal *pWal) {
|
|
||||||
if(pWal) free(pWal);
|
|
||||||
}
|
|
||||||
|
|
||||||
void walFsync(SWal *pWal, bool force) {}
|
|
||||||
|
|
||||||
int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walCommit(SWal *pWal, int64_t ver) { return 0; }
|
int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t walRollback(SWal *pWal, int64_t ver) { return 0; }
|
int32_t walPrune(SWal *pWal, int64_t ver) {
|
||||||
|
return 0;
|
||||||
int32_t walPrune(SWal *pWal, int64_t ver) { return 0; }
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t walRead(SWal *, SWalHead **, int64_t ver);
|
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
|
||||||
int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t walGetFirstVer(SWal *);
|
int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {
|
||||||
int64_t walGetSnapshotVer(SWal *);
|
return 0;
|
||||||
int64_t walGetLastVer(SWal *);
|
}
|
||||||
|
|
||||||
|
int64_t walGetFirstVer(SWal *pWal) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t walGetSnapshotVer(SWal *pWal) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t walGetLastVer(SWal *pWal) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
#include "tfile.h"
|
#include "tfile.h"
|
||||||
#include "twal.h"
|
|
||||||
#include "walInt.h"
|
#include "walInt.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -62,8 +61,8 @@ void walCleanUp() {
|
||||||
wInfo("wal module is cleaned up");
|
wInfo("wal module is cleaned up");
|
||||||
}
|
}
|
||||||
|
|
||||||
void *walOpen(char *path, SWalCfg *pCfg) {
|
SWal *walOpen(char *path, SWalCfg *pCfg) {
|
||||||
SWal *pWal = tcalloc(1, sizeof(SWal));
|
SWal *pWal = malloc(sizeof(SWal));
|
||||||
if (pWal == NULL) {
|
if (pWal == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -73,7 +72,7 @@ void *walOpen(char *path, SWalCfg *pCfg) {
|
||||||
pWal->tfd = -1;
|
pWal->tfd = -1;
|
||||||
pWal->fileId = -1;
|
pWal->fileId = -1;
|
||||||
pWal->level = pCfg->walLevel;
|
pWal->level = pCfg->walLevel;
|
||||||
pWal->keep = pCfg->keep;
|
/*pWal->keep = pCfg->keep;*/
|
||||||
pWal->fsyncPeriod = pCfg->fsyncPeriod;
|
pWal->fsyncPeriod = pCfg->fsyncPeriod;
|
||||||
tstrncpy(pWal->path, path, sizeof(pWal->path));
|
tstrncpy(pWal->path, path, sizeof(pWal->path));
|
||||||
pthread_mutex_init(&pWal->mutex, NULL);
|
pthread_mutex_init(&pWal->mutex, NULL);
|
||||||
|
@ -86,8 +85,8 @@ void *walOpen(char *path, SWalCfg *pCfg) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pWal->rid = taosAddRef(tsWal.refId, pWal);
|
pWal->rId = taosAddRef(tsWal.refId, pWal);
|
||||||
if (pWal->rid < 0) {
|
if (pWal->rId < 0) {
|
||||||
walFreeObj(pWal);
|
walFreeObj(pWal);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -97,9 +96,8 @@ void *walOpen(char *path, SWalCfg *pCfg) {
|
||||||
return pWal;
|
return pWal;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walAlter(void *handle, SWalCfg *pCfg) {
|
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
|
||||||
if (handle == NULL) return TSDB_CODE_WAL_APP_ERROR;
|
if (pWal == NULL) return TSDB_CODE_WAL_APP_ERROR;
|
||||||
SWal *pWal = handle;
|
|
||||||
|
|
||||||
if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) {
|
if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) {
|
||||||
wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->vgId, pWal->level,
|
wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->vgId, pWal->level,
|
||||||
|
@ -128,14 +126,13 @@ void walStop(void *handle) {
|
||||||
wDebug("vgId:%d, stop write wal", pWal->vgId);
|
wDebug("vgId:%d, stop write wal", pWal->vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
void walClose(void *handle) {
|
void walClose(SWal *pWal) {
|
||||||
if (handle == NULL) return;
|
if (pWal == NULL) return;
|
||||||
|
|
||||||
SWal *pWal = handle;
|
|
||||||
pthread_mutex_lock(&pWal->mutex);
|
pthread_mutex_lock(&pWal->mutex);
|
||||||
tfClose(pWal->tfd);
|
tfClose(pWal->tfd);
|
||||||
pthread_mutex_unlock(&pWal->mutex);
|
pthread_mutex_unlock(&pWal->mutex);
|
||||||
taosRemoveRef(tsWal.refId, pWal->rid);
|
taosRemoveRef(tsWal.refId, pWal->rId);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t walInitObj(SWal *pWal) {
|
static int32_t walInitObj(SWal *pWal) {
|
||||||
|
@ -186,7 +183,7 @@ static void walFsyncAll() {
|
||||||
wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code));
|
wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pWal = taosIterateRef(tsWal.refId, pWal->rid);
|
pWal = taosIterateRef(tsWal.refId, pWal->rId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,4 +115,4 @@ int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) {
|
||||||
wTrace("vgId:%d, path:%s, newFileId:%" PRId64, pWal->vgId, pWal->path, *newFileId);
|
wTrace("vgId:%d, path:%s, newFileId:%" PRId64, pWal->vgId, pWal->path, *newFileId);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
|
@ -14,13 +14,11 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#define TAOS_RANDOM_FILE_FAIL_TEST
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "taosmsg.h"
|
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
#include "tfile.h"
|
#include "tfile.h"
|
||||||
#include "twal.h"
|
|
||||||
#include "walInt.h"
|
#include "walInt.h"
|
||||||
|
|
||||||
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId);
|
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId);
|
||||||
|
@ -43,12 +41,12 @@ int32_t walRenew(void *handle) {
|
||||||
wDebug("vgId:%d, file:%s, it is closed while renew", pWal->vgId, pWal->name);
|
wDebug("vgId:%d, file:%s, it is closed while renew", pWal->vgId, pWal->name);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pWal->keep == TAOS_WAL_KEEP) {
|
/*if (pWal->keep == TAOS_WAL_KEEP) {*/
|
||||||
pWal->fileId = 0;
|
/*pWal->fileId = 0;*/
|
||||||
} else {
|
/*} else {*/
|
||||||
if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0;
|
/*if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0;*/
|
||||||
pWal->fileId++;
|
/*pWal->fileId++;*/
|
||||||
}
|
/*}*/
|
||||||
|
|
||||||
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
|
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
|
||||||
pWal->tfd = tfOpenCreateWrite(pWal->name);
|
pWal->tfd = tfOpenCreateWrite(pWal->name);
|
||||||
|
@ -68,7 +66,7 @@ int32_t walRenew(void *handle) {
|
||||||
void walRemoveOneOldFile(void *handle) {
|
void walRemoveOneOldFile(void *handle) {
|
||||||
SWal *pWal = handle;
|
SWal *pWal = handle;
|
||||||
if (pWal == NULL) return;
|
if (pWal == NULL) return;
|
||||||
if (pWal->keep == TAOS_WAL_KEEP) return;
|
/*if (pWal->keep == TAOS_WAL_KEEP) return;*/
|
||||||
if (!tfValid(pWal->tfd)) return;
|
if (!tfValid(pWal->tfd)) return;
|
||||||
|
|
||||||
pthread_mutex_lock(&pWal->mutex);
|
pthread_mutex_lock(&pWal->mutex);
|
||||||
|
@ -117,7 +115,7 @@ void walRemoveAllOldFiles(void *handle) {
|
||||||
static void walUpdateChecksum(SWalHead *pHead) {
|
static void walUpdateChecksum(SWalHead *pHead) {
|
||||||
pHead->sver = 2;
|
pHead->sver = 2;
|
||||||
pHead->cksum = 0;
|
pHead->cksum = 0;
|
||||||
pHead->cksum = taosCalcChecksum(0, (uint8_t *)pHead, sizeof(*pHead) + pHead->len);
|
pHead->cksum = taosCalcChecksum(0, (uint8_t *)pHead, sizeof(SWalHead) + pHead->len);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int walValidateChecksum(SWalHead *pHead) {
|
static int walValidateChecksum(SWalHead *pHead) {
|
||||||
|
@ -134,10 +132,14 @@ static int walValidateChecksum(SWalHead *pHead) {
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t walWrite(void *handle, SWalHead *pHead) {
|
int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {
|
||||||
if (handle == NULL) return -1;
|
if (pWal == NULL) return -1;
|
||||||
|
|
||||||
SWal * pWal = handle;
|
SWalHead *pHead = malloc(sizeof(SWalHead) + bodyLen);
|
||||||
|
if(pHead == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pHead->version = index;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
// no wal
|
// no wal
|
||||||
|
@ -146,6 +148,9 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
|
||||||
if (pHead->version <= pWal->version) return 0;
|
if (pHead->version <= pWal->version) return 0;
|
||||||
|
|
||||||
pHead->signature = WAL_SIGNATURE;
|
pHead->signature = WAL_SIGNATURE;
|
||||||
|
pHead->len = bodyLen;
|
||||||
|
memcpy(pHead->cont, body, bodyLen);
|
||||||
|
|
||||||
#if defined(WAL_CHECKSUM_WHOLE)
|
#if defined(WAL_CHECKSUM_WHOLE)
|
||||||
walUpdateChecksum(pHead);
|
walUpdateChecksum(pHead);
|
||||||
#else
|
#else
|
||||||
|
@ -173,8 +178,7 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void walFsync(void *handle, bool forceFsync) {
|
void walFsync(SWal *pWal, bool forceFsync) {
|
||||||
SWal *pWal = handle;
|
|
||||||
if (pWal == NULL || !tfValid(pWal->tfd)) return;
|
if (pWal == NULL || !tfValid(pWal->tfd)) return;
|
||||||
|
|
||||||
if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
|
if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
|
||||||
|
@ -211,7 +215,7 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pWal->keep != TAOS_WAL_KEEP) return TSDB_CODE_SUCCESS;
|
/*if (pWal->keep != TAOS_WAL_KEEP) return TSDB_CODE_SUCCESS;*/
|
||||||
|
|
||||||
if (count == 0) {
|
if (count == 0) {
|
||||||
wDebug("vgId:%d, wal file not exist, renew it", pWal->vgId);
|
wDebug("vgId:%d, wal file not exist, renew it", pWal->vgId);
|
||||||
|
@ -307,119 +311,10 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd,
|
||||||
|
|
||||||
return TSDB_CODE_WAL_FILE_CORRUPTED;
|
return TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
}
|
}
|
||||||
// Add SMemRowType ahead of SDataRow
|
|
||||||
static void expandSubmitBlk(SSubmitBlk *pDest, SSubmitBlk *pSrc, int32_t *lenExpand) {
|
|
||||||
// copy the header firstly
|
|
||||||
memcpy(pDest, pSrc, sizeof(SSubmitBlk));
|
|
||||||
|
|
||||||
int32_t nRows = htons(pDest->numOfRows);
|
|
||||||
int32_t dataLen = htonl(pDest->dataLen);
|
|
||||||
|
|
||||||
if ((nRows <= 0) || (dataLen <= 0)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
char *pDestData = pDest->data;
|
|
||||||
char *pSrcData = pSrc->data;
|
|
||||||
for (int32_t i = 0; i < nRows; ++i) {
|
|
||||||
memRowSetType(pDestData, SMEM_ROW_DATA);
|
|
||||||
memcpy(memRowDataBody(pDestData), pSrcData, dataRowLen(pSrcData));
|
|
||||||
pDestData = POINTER_SHIFT(pDestData, memRowTLen(pDestData));
|
|
||||||
pSrcData = POINTER_SHIFT(pSrcData, dataRowLen(pSrcData));
|
|
||||||
++(*lenExpand);
|
|
||||||
}
|
|
||||||
pDest->dataLen = htonl(dataLen + nRows * sizeof(uint8_t));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check SDataRow by comparing the SDataRow len and SSubmitBlk dataLen
|
|
||||||
static bool walIsSDataRow(void *pBlkData, int nRows, int32_t dataLen) {
|
|
||||||
if ((nRows <= 0) || (dataLen <= 0)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
int32_t len = 0, kvLen = 0;
|
|
||||||
for (int i = 0; i < nRows; ++i) {
|
|
||||||
len += dataRowLen(pBlkData);
|
|
||||||
if (len > dataLen) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* For SDataRow between version [2.1.5.0 and 2.1.6.X], it would never conflict.
|
|
||||||
* For SKVRow between version [2.1.5.0 and 2.1.6.X], it may conflict in below scenario
|
|
||||||
* - with 1st type byte 0x01 and sversion 0x0101(257), thus do further check
|
|
||||||
*/
|
|
||||||
if (dataRowLen(pBlkData) == 257) {
|
|
||||||
SMemRow memRow = pBlkData;
|
|
||||||
SKVRow kvRow = memRowKvBody(memRow);
|
|
||||||
int nCols = kvRowNCols(kvRow);
|
|
||||||
uint16_t calcTsOffset = (uint16_t)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nCols);
|
|
||||||
uint16_t realTsOffset = (kvRowColIdx(kvRow))->offset;
|
|
||||||
if (calcTsOffset == realTsOffset) {
|
|
||||||
kvLen += memRowKvTLen(memRow);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pBlkData = POINTER_SHIFT(pBlkData, dataRowLen(pBlkData));
|
|
||||||
}
|
|
||||||
if (len != dataLen) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (kvLen == dataLen) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
// for WAL SMemRow/SDataRow compatibility
|
|
||||||
static int walSMemRowCheck(SWalHead *pHead) {
|
|
||||||
if ((pHead->sver < 2) && (pHead->msgType == TSDB_MSG_TYPE_SUBMIT)) {
|
|
||||||
SSubmitMsg *pMsg = (SSubmitMsg *)pHead->cont;
|
|
||||||
int32_t numOfBlocks = htonl(pMsg->numOfBlocks);
|
|
||||||
if (numOfBlocks <= 0) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t nTotalRows = 0;
|
|
||||||
SSubmitBlk *pBlk = (SSubmitBlk *)pMsg->blocks;
|
|
||||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
|
||||||
int32_t dataLen = htonl(pBlk->dataLen);
|
|
||||||
int32_t nRows = htons(pBlk->numOfRows);
|
|
||||||
nTotalRows += nRows;
|
|
||||||
if (!walIsSDataRow(pBlk->data, nRows, dataLen)) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
pBlk = (SSubmitBlk *)POINTER_SHIFT(pBlk, sizeof(SSubmitBlk) + dataLen);
|
|
||||||
}
|
|
||||||
ASSERT(nTotalRows >= 0);
|
|
||||||
SWalHead *pWalHead = (SWalHead *)calloc(sizeof(SWalHead) + pHead->len + nTotalRows * sizeof(uint8_t), 1);
|
|
||||||
if (pWalHead == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(pWalHead, pHead, sizeof(SWalHead) + sizeof(SSubmitMsg));
|
|
||||||
|
|
||||||
SSubmitMsg *pDestMsg = (SSubmitMsg *)pWalHead->cont;
|
|
||||||
SSubmitBlk *pDestBlks = (SSubmitBlk *)pDestMsg->blocks;
|
|
||||||
SSubmitBlk *pSrcBlks = (SSubmitBlk *)pMsg->blocks;
|
|
||||||
int32_t lenExpand = 0;
|
|
||||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
|
||||||
expandSubmitBlk(pDestBlks, pSrcBlks, &lenExpand);
|
|
||||||
pDestBlks = POINTER_SHIFT(pDestBlks, htonl(pDestBlks->dataLen) + sizeof(SSubmitBlk));
|
|
||||||
pSrcBlks = POINTER_SHIFT(pSrcBlks, htonl(pSrcBlks->dataLen) + sizeof(SSubmitBlk));
|
|
||||||
}
|
|
||||||
if (lenExpand > 0) {
|
|
||||||
pDestMsg->header.contLen = htonl(pDestMsg->length) + lenExpand;
|
|
||||||
pDestMsg->length = htonl(pDestMsg->header.contLen);
|
|
||||||
pWalHead->len = pWalHead->len + lenExpand;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(pHead, pWalHead, sizeof(SWalHead) + pWalHead->len);
|
|
||||||
tfree(pWalHead);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) {
|
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) {
|
||||||
int32_t size = WAL_MAX_SIZE;
|
int32_t size = WAL_MAX_SIZE;
|
||||||
void * buffer = tmalloc(size);
|
void * buffer = malloc(size);
|
||||||
if (buffer == NULL) {
|
if (buffer == NULL) {
|
||||||
wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
|
wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
|
@ -541,14 +436,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
|
||||||
pWal->version = pHead->version;
|
pWal->version = pHead->version;
|
||||||
|
|
||||||
// wInfo("writeFp: %ld", offset);
|
// wInfo("writeFp: %ld", offset);
|
||||||
if (0 != walSMemRowCheck(pHead)) {
|
(*writeFp)(pVnode, pHead, NULL);
|
||||||
wError("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64,
|
|
||||||
pWal->vgId, fileId, pHead->version, pWal->version, pHead->len, offset);
|
|
||||||
tfClose(tfd);
|
|
||||||
tfree(buffer);
|
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
|
||||||
}
|
|
||||||
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tfClose(tfd);
|
tfClose(tfd);
|
||||||
|
@ -558,9 +446,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t walGetVersion(twalh param) {
|
uint64_t walGetVersion(SWal *pWal) {
|
||||||
SWal *pWal = param;
|
if (pWal == NULL) return 0;
|
||||||
if (pWal == 0) return 0;
|
|
||||||
|
|
||||||
return pWal->version;
|
return pWal->version;
|
||||||
}
|
}
|
||||||
|
@ -570,10 +457,9 @@ uint64_t walGetVersion(twalh param) {
|
||||||
// Some new wal record cannot be written to the wal file in dnode1 for wal version not reset, then fversion and the record in wal file may inconsistent,
|
// Some new wal record cannot be written to the wal file in dnode1 for wal version not reset, then fversion and the record in wal file may inconsistent,
|
||||||
// At this time, if dnode2 down, dnode1 switched to master. After dnode2 start and restore data from dnode1, data loss will occur
|
// At this time, if dnode2 down, dnode1 switched to master. After dnode2 start and restore data from dnode1, data loss will occur
|
||||||
|
|
||||||
void walResetVersion(twalh param, uint64_t newVer) {
|
void walResetVersion(SWal *pWal, uint64_t newVer) {
|
||||||
SWal *pWal = param;
|
if (pWal == NULL) return;
|
||||||
if (pWal == 0) return;
|
|
||||||
wInfo("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->version, newVer);
|
wInfo("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->version, newVer);
|
||||||
|
|
||||||
pWal->version = newVer;
|
pWal->version = newVer;
|
||||||
}
|
}
|
|
@ -0,0 +1,137 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
//#define _DEFAULT_SOURCE
|
||||||
|
#include "os.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "twal.h"
|
||||||
|
#include "tfile.h"
|
||||||
|
|
||||||
|
int64_t ver = 0;
|
||||||
|
void *pWal = NULL;
|
||||||
|
|
||||||
|
int writeToQueue(void *pVnode, void *data, int type, void *pMsg) {
|
||||||
|
// do nothing
|
||||||
|
SWalHead *pHead = data;
|
||||||
|
|
||||||
|
if (pHead->version > ver)
|
||||||
|
ver = pHead->version;
|
||||||
|
|
||||||
|
walWrite(pWal, pHead);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char *argv[]) {
|
||||||
|
char path[128] = "/tmp/wal";
|
||||||
|
int level = 2;
|
||||||
|
int total = 5;
|
||||||
|
int rows = 10000;
|
||||||
|
int size = 128;
|
||||||
|
int keep = 0;
|
||||||
|
|
||||||
|
for (int i=1; i<argc; ++i) {
|
||||||
|
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
||||||
|
tstrncpy(path, argv[++i], sizeof(path));
|
||||||
|
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
|
||||||
|
level = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-r")==0 && i < argc-1) {
|
||||||
|
rows = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
|
||||||
|
keep = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
|
||||||
|
total = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
|
||||||
|
size = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-v")==0 && i < argc-1) {
|
||||||
|
ver = atoll(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
|
||||||
|
dDebugFlag = atoi(argv[++i]);
|
||||||
|
} else {
|
||||||
|
printf("\nusage: %s [options] \n", argv[0]);
|
||||||
|
printf(" [-p path]: wal file path default is:%s\n", path);
|
||||||
|
printf(" [-l level]: log level, default is:%d\n", level);
|
||||||
|
printf(" [-t total]: total wal files, default is:%d\n", total);
|
||||||
|
printf(" [-r rows]: rows of records per wal file, default is:%d\n", rows);
|
||||||
|
printf(" [-k keep]: keep the wal after closing, default is:%d\n", keep);
|
||||||
|
printf(" [-v version]: initial version, default is:%" PRId64 "\n", ver);
|
||||||
|
printf(" [-d debugFlag]: debug flag, default:%d\n", dDebugFlag);
|
||||||
|
printf(" [-h help]: print out this help\n\n");
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosInitLog("wal.log", 100000, 10);
|
||||||
|
tfInit();
|
||||||
|
walInit();
|
||||||
|
|
||||||
|
SWalCfg walCfg = {0};
|
||||||
|
walCfg.walLevel = level;
|
||||||
|
walCfg.keep = keep;
|
||||||
|
|
||||||
|
pWal = walOpen(path, &walCfg);
|
||||||
|
if (pWal == NULL) {
|
||||||
|
printf("failed to open wal\n");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
int ret = walRestore(pWal, NULL, writeToQueue);
|
||||||
|
if (ret <0) {
|
||||||
|
printf("failed to restore wal\n");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("version starts from:%" PRId64 "\n", ver);
|
||||||
|
|
||||||
|
int contLen = sizeof(SWalHead) + size;
|
||||||
|
SWalHead *pHead = (SWalHead *) malloc(contLen);
|
||||||
|
|
||||||
|
for (int i=0; i<total; ++i) {
|
||||||
|
for (int k=0; k<rows; ++k) {
|
||||||
|
pHead->version = ++ver;
|
||||||
|
pHead->len = size;
|
||||||
|
walWrite(pWal, pHead);
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("renew a wal, i:%d\n", i);
|
||||||
|
walRenew(pWal);
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("%d wal files are written\n", total);
|
||||||
|
|
||||||
|
int64_t index = 0;
|
||||||
|
char name[256];
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
int code = walGetWalFile(pWal, name, &index);
|
||||||
|
if (code == -1) {
|
||||||
|
printf("failed to get wal file, index:%" PRId64 "\n", index);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("index:%" PRId64 " wal:%s\n", index, name);
|
||||||
|
if (code == 0) break;
|
||||||
|
}
|
||||||
|
|
||||||
|
getchar();
|
||||||
|
|
||||||
|
walClose(pWal);
|
||||||
|
walCleanUp();
|
||||||
|
tfCleanup();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -402,7 +402,7 @@ void taosPrintGlobalCfg() {
|
||||||
|
|
||||||
for (int i = 0; i < tsGlobalConfigNum; ++i) {
|
for (int i = 0; i < tsGlobalConfigNum; ++i) {
|
||||||
SGlobalCfg *cfg = tsGlobalConfig + i;
|
SGlobalCfg *cfg = tsGlobalConfig + i;
|
||||||
if (tscEmbedded == 0 && !(cfg->cfgType & TSDB_CFG_CTYPE_B_CLIENT)) continue;
|
if (tscEmbeddedInUtil == 0 && !(cfg->cfgType & TSDB_CFG_CTYPE_B_CLIENT)) continue;
|
||||||
if (cfg->cfgType & TSDB_CFG_CTYPE_B_NOT_PRINT) continue;
|
if (cfg->cfgType & TSDB_CFG_CTYPE_B_NOT_PRINT) continue;
|
||||||
|
|
||||||
int optionLen = (int)strlen(cfg->option);
|
int optionLen = (int)strlen(cfg->option);
|
||||||
|
@ -487,7 +487,7 @@ void taosDumpGlobalCfg() {
|
||||||
printf("==================================\n");
|
printf("==================================\n");
|
||||||
for (int i = 0; i < tsGlobalConfigNum; ++i) {
|
for (int i = 0; i < tsGlobalConfigNum; ++i) {
|
||||||
SGlobalCfg *cfg = tsGlobalConfig + i;
|
SGlobalCfg *cfg = tsGlobalConfig + i;
|
||||||
if (tscEmbedded == 0 && !(cfg->cfgType & TSDB_CFG_CTYPE_B_CLIENT)) continue;
|
if (tscEmbeddedInUtil == 0 && !(cfg->cfgType & TSDB_CFG_CTYPE_B_CLIENT)) continue;
|
||||||
if (cfg->cfgType & TSDB_CFG_CTYPE_B_NOT_PRINT) continue;
|
if (cfg->cfgType & TSDB_CFG_CTYPE_B_NOT_PRINT) continue;
|
||||||
if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_SHOW)) continue;
|
if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_SHOW)) continue;
|
||||||
|
|
||||||
|
@ -499,7 +499,7 @@ void taosDumpGlobalCfg() {
|
||||||
|
|
||||||
for (int i = 0; i < tsGlobalConfigNum; ++i) {
|
for (int i = 0; i < tsGlobalConfigNum; ++i) {
|
||||||
SGlobalCfg *cfg = tsGlobalConfig + i;
|
SGlobalCfg *cfg = tsGlobalConfig + i;
|
||||||
if (tscEmbedded == 0 && !(cfg->cfgType & TSDB_CFG_CTYPE_B_CLIENT)) continue;
|
if (tscEmbeddedInUtil == 0 && !(cfg->cfgType & TSDB_CFG_CTYPE_B_CLIENT)) continue;
|
||||||
if (cfg->cfgType & TSDB_CFG_CTYPE_B_NOT_PRINT) continue;
|
if (cfg->cfgType & TSDB_CFG_CTYPE_B_NOT_PRINT) continue;
|
||||||
if (cfg->cfgType & TSDB_CFG_CTYPE_B_SHOW) continue;
|
if (cfg->cfgType & TSDB_CFG_CTYPE_B_SHOW) continue;
|
||||||
|
|
||||||
|
|
|
@ -68,6 +68,8 @@ typedef struct {
|
||||||
pthread_mutex_t logMutex;
|
pthread_mutex_t logMutex;
|
||||||
} SLogObj;
|
} SLogObj;
|
||||||
|
|
||||||
|
int8_t tscEmbeddedInUtil = 0;
|
||||||
|
|
||||||
int32_t tsLogKeepDays = 0;
|
int32_t tsLogKeepDays = 0;
|
||||||
int8_t tsAsyncLog = 1;
|
int8_t tsAsyncLog = 1;
|
||||||
float tsTotalLogDirGB = 0;
|
float tsTotalLogDirGB = 0;
|
||||||
|
|
|
@ -1,68 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef TDENGINE_WAL_INT_H
|
|
||||||
#define TDENGINE_WAL_INT_H
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#include "tlog.h"
|
|
||||||
|
|
||||||
extern int32_t wDebugFlag;
|
|
||||||
|
|
||||||
#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", 255, __VA_ARGS__); }}
|
|
||||||
#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", 255, __VA_ARGS__); }}
|
|
||||||
#define wWarn(...) { if (wDebugFlag & DEBUG_WARN) { taosPrintLog("WAL WARN ", 255, __VA_ARGS__); }}
|
|
||||||
#define wInfo(...) { if (wDebugFlag & DEBUG_INFO) { taosPrintLog("WAL ", 255, __VA_ARGS__); }}
|
|
||||||
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
|
|
||||||
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
|
|
||||||
|
|
||||||
#define WAL_PREFIX "wal"
|
|
||||||
#define WAL_PREFIX_LEN 3
|
|
||||||
#define WAL_REFRESH_MS 1000
|
|
||||||
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
|
|
||||||
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
|
|
||||||
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
|
||||||
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
|
||||||
#define WAL_FILE_NUM 1 // 3
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
uint64_t version;
|
|
||||||
int64_t fileId;
|
|
||||||
int64_t rid;
|
|
||||||
int64_t tfd;
|
|
||||||
int32_t vgId;
|
|
||||||
int32_t keep;
|
|
||||||
int32_t level;
|
|
||||||
int32_t fsyncPeriod;
|
|
||||||
int32_t fsyncSeq;
|
|
||||||
int8_t stop;
|
|
||||||
int8_t reserved[3];
|
|
||||||
char path[WAL_PATH_LEN];
|
|
||||||
char name[WAL_FILE_LEN];
|
|
||||||
pthread_mutex_t mutex;
|
|
||||||
} SWal;
|
|
||||||
|
|
||||||
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
|
|
||||||
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
|
|
||||||
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif
|
|
|
@ -1,137 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
//#define _DEFAULT_SOURCE
|
|
||||||
#include "os.h"
|
|
||||||
#include "tutil.h"
|
|
||||||
#include "tglobal.h"
|
|
||||||
#include "tlog.h"
|
|
||||||
#include "twal.h"
|
|
||||||
#include "tfile.h"
|
|
||||||
|
|
||||||
int64_t ver = 0;
|
|
||||||
void *pWal = NULL;
|
|
||||||
|
|
||||||
int writeToQueue(void *pVnode, void *data, int type, void *pMsg) {
|
|
||||||
// do nothing
|
|
||||||
SWalHead *pHead = data;
|
|
||||||
|
|
||||||
if (pHead->version > ver)
|
|
||||||
ver = pHead->version;
|
|
||||||
|
|
||||||
walWrite(pWal, pHead);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
|
||||||
char path[128] = "/tmp/wal";
|
|
||||||
int level = 2;
|
|
||||||
int total = 5;
|
|
||||||
int rows = 10000;
|
|
||||||
int size = 128;
|
|
||||||
int keep = 0;
|
|
||||||
|
|
||||||
for (int i=1; i<argc; ++i) {
|
|
||||||
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
|
||||||
tstrncpy(path, argv[++i], sizeof(path));
|
|
||||||
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
|
|
||||||
level = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-r")==0 && i < argc-1) {
|
|
||||||
rows = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
|
|
||||||
keep = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
|
|
||||||
total = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
|
|
||||||
size = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-v")==0 && i < argc-1) {
|
|
||||||
ver = atoll(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
|
|
||||||
dDebugFlag = atoi(argv[++i]);
|
|
||||||
} else {
|
|
||||||
printf("\nusage: %s [options] \n", argv[0]);
|
|
||||||
printf(" [-p path]: wal file path default is:%s\n", path);
|
|
||||||
printf(" [-l level]: log level, default is:%d\n", level);
|
|
||||||
printf(" [-t total]: total wal files, default is:%d\n", total);
|
|
||||||
printf(" [-r rows]: rows of records per wal file, default is:%d\n", rows);
|
|
||||||
printf(" [-k keep]: keep the wal after closing, default is:%d\n", keep);
|
|
||||||
printf(" [-v version]: initial version, default is:%" PRId64 "\n", ver);
|
|
||||||
printf(" [-d debugFlag]: debug flag, default:%d\n", dDebugFlag);
|
|
||||||
printf(" [-h help]: print out this help\n\n");
|
|
||||||
exit(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosInitLog("wal.log", 100000, 10);
|
|
||||||
tfInit();
|
|
||||||
walInit();
|
|
||||||
|
|
||||||
SWalCfg walCfg = {0};
|
|
||||||
walCfg.walLevel = level;
|
|
||||||
walCfg.keep = keep;
|
|
||||||
|
|
||||||
pWal = walOpen(path, &walCfg);
|
|
||||||
if (pWal == NULL) {
|
|
||||||
printf("failed to open wal\n");
|
|
||||||
exit(-1);
|
|
||||||
}
|
|
||||||
|
|
||||||
int ret = walRestore(pWal, NULL, writeToQueue);
|
|
||||||
if (ret <0) {
|
|
||||||
printf("failed to restore wal\n");
|
|
||||||
exit(-1);
|
|
||||||
}
|
|
||||||
|
|
||||||
printf("version starts from:%" PRId64 "\n", ver);
|
|
||||||
|
|
||||||
int contLen = sizeof(SWalHead) + size;
|
|
||||||
SWalHead *pHead = (SWalHead *) malloc(contLen);
|
|
||||||
|
|
||||||
for (int i=0; i<total; ++i) {
|
|
||||||
for (int k=0; k<rows; ++k) {
|
|
||||||
pHead->version = ++ver;
|
|
||||||
pHead->len = size;
|
|
||||||
walWrite(pWal, pHead);
|
|
||||||
}
|
|
||||||
|
|
||||||
printf("renew a wal, i:%d\n", i);
|
|
||||||
walRenew(pWal);
|
|
||||||
}
|
|
||||||
|
|
||||||
printf("%d wal files are written\n", total);
|
|
||||||
|
|
||||||
int64_t index = 0;
|
|
||||||
char name[256];
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
int code = walGetWalFile(pWal, name, &index);
|
|
||||||
if (code == -1) {
|
|
||||||
printf("failed to get wal file, index:%" PRId64 "\n", index);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
printf("index:%" PRId64 " wal:%s\n", index, name);
|
|
||||||
if (code == 0) break;
|
|
||||||
}
|
|
||||||
|
|
||||||
getchar();
|
|
||||||
|
|
||||||
walClose(pWal);
|
|
||||||
walCleanUp();
|
|
||||||
tfCleanup();
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
Loading…
Reference in New Issue