This commit is contained in:
Shengliang Guan 2020-10-30 18:08:48 +08:00
parent 518b9359fe
commit d4fbd7bbab
13 changed files with 209 additions and 232 deletions

View File

@ -19,6 +19,7 @@
#include "tutil.h" #include "tutil.h"
#include "tconfig.h" #include "tconfig.h"
#include "tglobal.h" #include "tglobal.h"
#include "twal.h"
#include "dnode.h" #include "dnode.h"
#include "dnodeInt.h" #include "dnodeInt.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
@ -50,6 +51,7 @@ typedef struct {
static const SDnodeComponent tsDnodeComponents[] = { static const SDnodeComponent tsDnodeComponents[] = {
{"storage", dnodeInitStorage, dnodeCleanupStorage}, {"storage", dnodeInitStorage, dnodeCleanupStorage},
{"wal", walInit, walCleanUp},
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!! {"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, {"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead},
{"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite}, {"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite},

View File

@ -68,7 +68,7 @@ typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uin
// get the wal file from index or after // get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
typedef int (*FGetWalInfo)(void *ahandle, char *name, uint32_t *index); typedef int (*FGetWalInfo)(void *ahandle, char *name, int64_t *index);
// when a forward pkt is received, call this to handle data // when a forward pkt is received, call this to handle data
typedef int (*FWriteToCache)(void *ahandle, void *pHead, int type); typedef int (*FWriteToCache)(void *ahandle, void *pHead, int type);

View File

@ -51,13 +51,13 @@ int32_t walInit();
void walCleanUp(); void walCleanUp();
twalh walOpen(char *path, SWalCfg *pCfg); twalh walOpen(char *path, SWalCfg *pCfg);
int walAlter(twalh pWal, SWalCfg *pCfg); int32_t walAlter(twalh pWal, SWalCfg *pCfg);
void walClose(twalh); void walClose(twalh);
int walRenew(twalh); int32_t walRenew(twalh);
int walWrite(twalh, SWalHead *); int32_t walWrite(twalh, SWalHead *);
void walFsync(twalh); void walFsync(twalh);
int walRestore(twalh, void *pVnode, FWalWrite writeFp); int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
int walGetWalFile(twalh, char *name, uint32_t *index); int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
int64_t walGetVersion(twalh); int64_t walGetVersion(twalh);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -175,7 +175,7 @@ static void *sdbGetTableFromId(int32_t tableId) {
} }
static int32_t sdbInitWal() { static int32_t sdbInitWal() {
SWalCfg walCfg = {.walLevel = 2, .wals = 2, .keep = 1, .fsyncPeriod = 0}; SWalCfg walCfg = {.vgId = 1, .walLevel = 2, .wals = 2, .keep = 1, .fsyncPeriod = 0};
char temp[TSDB_FILENAME_LEN]; char temp[TSDB_FILENAME_LEN];
sprintf(temp, "%s/wal", tsMnodeDir); sprintf(temp, "%s/wal", tsMnodeDir);
tsSdbObj.wal = walOpen(temp, &walCfg); tsSdbObj.wal = walOpen(temp, &walCfg);
@ -237,7 +237,7 @@ static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint3
return 0; return 0;
} }
static int sdbGetWalInfo(void *ahandle, char *name, uint32_t *index) { static int sdbGetWalInfo(void *ahandle, char *name, int64_t *index) {
return walGetWalFile(tsSdbObj.wal, name, index); return walGetWalFile(tsSdbObj.wal, name, index);
} }

View File

@ -287,7 +287,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
return -1; return -1;
} }
static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) { static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int code = -1; int code = -1;
char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file
@ -377,7 +377,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) {
int32_t size; int32_t size;
struct stat fstat; struct stat fstat;
int code = -1; int code = -1;
uint32_t index = 0; int64_t index = 0;
while (1) { while (1) {
// retrieve wal info // retrieve wal info

View File

@ -254,7 +254,7 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex
return magic; return magic;
} }
int getWalInfo(void *ahandle, char *name, uint32_t *index) { int getWalInfo(void *ahandle, char *name, uint64_t *index) {
struct stat fstat; struct stat fstat;
char aname[280]; char aname[280];

View File

@ -23,18 +23,19 @@ extern "C" {
#ifdef TSDB_USE_SYS_MEM #ifdef TSDB_USE_SYS_MEM
#define tmalloc(size) malloc(size) #define tmalloc(size) malloc(size)
#define tcalloc(size) calloc(1, size) #define tcalloc(size) calloc(1, size)
#define trealloc(p, size) realloc(p, size)
#define tmemalign(alignment, size) malloc(size) #define tmemalign(alignment, size) malloc(size)
#define tfree(p) free(p) #define tfree(p) free(p)
#define tmemzero(p, size) memset(p, 0, size) #define tmemzero(p, size) memset(p, 0, size)
#else #else
void *tmalloc(int32_t size); void *tmalloc(int32_t size);
void *tcalloc(int32_t size); void *tcalloc(int32_t size);
void *trealloc(void *p, int32_t size);
void *tmemalign(int32_t alignment, int32_t size); void *tmemalign(int32_t alignment, int32_t size);
void tfree(void *p); void tfree(void *p);
void tmemzero(void *p, int32_t size); void tmemzero(void *p, int32_t size);
#endif #endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -40,6 +40,16 @@ void *tcalloc(int32_t size) {
return p; return p;
} }
void *trealloc(void *p, int32_t size) {
p = realloc(p, size);
if (p == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to realloc memory, size:%d reason:%s", size, strerror(errno));
}
return p;
}
void tfree(void *p) { free(p); } void tfree(void *p) { free(p); }
void tmemzero(void *p, int32_t size) { memset(p, 0, size); } void tmemzero(void *p, int32_t size) { memset(p, 0, size); }

View File

@ -42,7 +42,7 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode);
static int32_t vnodeReadVersion(SVnodeObj *pVnode); static int32_t vnodeReadVersion(SVnodeObj *pVnode);
static int vnodeProcessTsdbStatus(void *arg, int status); static int vnodeProcessTsdbStatus(void *arg, int status);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static int vnodeGetWalInfo(void *ahandle, char *name, int64_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeCtrlFlow(void *handle, int32_t mseconds); static void vnodeCtrlFlow(void *handle, int32_t mseconds);
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
@ -304,6 +304,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
sprintf(temp, "%s/wal", rootDir); sprintf(temp, "%s/wal", rootDir);
pVnode->wal = walOpen(temp, &pVnode->walCfg); pVnode->wal = walOpen(temp, &pVnode->walCfg);
pVnode->walCfg.vgId = pVnode->vgId;
if (pVnode->wal == NULL) { if (pVnode->wal == NULL) {
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return terrno; return terrno;
@ -621,7 +622,7 @@ static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uin
return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size); return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size);
} }
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) { static int vnodeGetWalInfo(void *ahandle, char *name, int64_t *index) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = ahandle;
return walGetWalFile(pVnode->wal, name, index); return walGetWalFile(pVnode->wal, name, index);
} }

View File

@ -31,9 +31,13 @@ extern int32_t wDebugFlag;
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define walPrefix "wal" #define WAL_PREFIX "wal"
#define walRefreshIntervalMs 1000 #define WAL_PREFIX_LEN 4
#define walSignature (uint32_t)(0xFAFBFDFE) #define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (1024 * 1024)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (TSDB_FILENAME_LEN + 32)
typedef struct { typedef struct {
uint64_t version; uint64_t version;
@ -43,14 +47,9 @@ typedef struct {
int32_t level; int32_t level;
int32_t fsyncPeriod; int32_t fsyncPeriod;
int32_t fsyncSeq; int32_t fsyncSeq;
int32_t fileIndex; int64_t fileId;
void* timer; char path[WAL_PATH_LEN];
void* signature; char name[WAL_FILE_LEN];
int max; // maximum number of wal files
uint32_t id; // increase continuously
int num; // number of wal files
char path[TSDB_FILENAME_LEN];
char name[TSDB_FILENAME_LEN + 16];
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SWal; } SWal;

View File

@ -27,7 +27,6 @@ typedef struct {
int32_t num; int32_t num;
int32_t seq; int32_t seq;
int8_t stop; int8_t stop;
int8_t reserved[3];
pthread_t thread; pthread_t thread;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SWalMgmt; } SWalMgmt;
@ -48,7 +47,7 @@ int32_t walInit() {
return code; return code;
} }
wInfo("wal module is initialized"); wInfo("wal module is initialized, refId:%d", tsWal.refId);
return code; return code;
} }
@ -65,15 +64,13 @@ void *walOpen(char *path, SWalCfg *pCfg) {
return NULL; return NULL;
} }
pWal->vgId = pCfg->vgId;
pWal->fd = -1; pWal->fd = -1;
pWal->max = pCfg->wals; pWal->fileId = -1;
pWal->id = 0;
pWal->num = 0;
pWal->level = pCfg->walLevel; pWal->level = pCfg->walLevel;
pWal->keep = pCfg->keep; pWal->keep = pCfg->keep;
pWal->fsyncPeriod = pCfg->fsyncPeriod; pWal->fsyncPeriod = pCfg->fsyncPeriod;
pWal->signature = pWal; tstrncpy(pWal->path, path, sizeof(pWal->path));
tstrncpy(pWal->path, path, sizeof(path));
pthread_mutex_init(&pWal->mutex, NULL); pthread_mutex_init(&pWal->mutex, NULL);
pWal->fsyncSeq = pCfg->fsyncPeriod % 1000; pWal->fsyncSeq = pCfg->fsyncPeriod % 1000;
@ -90,12 +87,14 @@ void *walOpen(char *path, SWalCfg *pCfg) {
} }
atomic_add_fetch_32(&tsWal.num, 1); atomic_add_fetch_32(&tsWal.num, 1);
wDebug("vgId:%d, wal is opened, level:%d period:%d path:%s", pWal->vgId, pWal->level, pWal->fsyncPeriod, pWal->path); wDebug("vgId:%d, wal:%p is opened, level:%d period:%d path:%s", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod,
pWal->path);
return pWal; return pWal;
} }
int32_t walAlter(void *handle, SWalCfg *pCfg) { int32_t walAlter(void *handle, SWalCfg *pCfg) {
if (handle == NULL) return TSDB_CODE_WAL_APP_ERROR;
SWal *pWal = handle; SWal *pWal = handle;
if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) { if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) {
@ -109,26 +108,27 @@ int32_t walAlter(void *handle, SWalCfg *pCfg) {
pWal->level = pCfg->walLevel; pWal->level = pCfg->walLevel;
pWal->fsyncPeriod = pCfg->fsyncPeriod; pWal->fsyncPeriod = pCfg->fsyncPeriod;
pWal->fsyncSeq = pCfg->fsyncPeriod % 1000;
if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void walClose(void *handle) { void walClose(void *handle) {
if (handle == NULL) return;
SWal *pWal = handle; SWal *pWal = handle;
taosClose(pWal->fd); taosClose(pWal->fd);
if (pWal->keep == 0) { if (pWal->keep == 0) {
// remove all files in the directory snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, pWal->fileId);
for (int32_t i = 0; i < pWal->num; ++i) {
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id - i);
if (remove(pWal->name) < 0) { if (remove(pWal->name) < 0) {
wError("vgId:%d, wal:%s, failed to remove", pWal->vgId, pWal->name); wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
} else { } else {
wDebug("vgId:%d, wal:%s, it is removed", pWal->vgId, pWal->name); wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
}
} }
} else { } else {
wDebug("vgId:%d, wal:%s, it is closed and kept", pWal->vgId, pWal->name); wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name);
} }
taosRemoveRef(tsWal.refId, pWal); taosRemoveRef(tsWal.refId, pWal);
@ -145,6 +145,8 @@ static int32_t walInitObj(SWal *pWal) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
walRenew(pWal);
if (pWal && pWal->fd < 0) { if (pWal && pWal->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->path, strerror(errno)); wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->path, strerror(errno));
@ -156,53 +158,45 @@ static int32_t walInitObj(SWal *pWal) {
} }
static void walFreeObj(void *wal) { static void walFreeObj(void *wal) {
SWal *pWal = pWal; SWal *pWal = wal;
wDebug("vgId:%d, wal is freed", pWal->vgId); wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal);
taosClose(pWal->fd); taosClose(pWal->fd);
pthread_mutex_destroy(&pWal->mutex); pthread_mutex_destroy(&pWal->mutex);
tfree(pWal); tfree(pWal);
} }
// static bool walNeedFsync(SWal *pWal) { static bool walNeedFsync(SWal *pWal) {
// if (pWal->fsyncPeriod <= 0 || pWal->level != TAOS_WAL_FSYNC) { if (pWal->fsyncPeriod <= 0 || pWal->level != TAOS_WAL_FSYNC) {
// return false; return false;
// } }
// if (tsWal.seq % pWal->fsyncSeq == 0) { if (tsWal.seq % pWal->fsyncSeq == 0) {
// return true; return true;
// } }
// return false; return false;
// } }
static void walUpdateSeq() { static void walUpdateSeq() {
taosMsleep(walRefreshIntervalMs); taosMsleep(WAL_REFRESH_MS);
if (++tsWal.seq <= 0) { if (++tsWal.seq <= 0) {
tsWal.seq = 1; tsWal.seq = 1;
} }
} }
static void walFsyncAll() { static void walFsyncAll() {
// int32_t code; SWal *pWal = taosIterateRef(tsWal.refId, NULL);
// void * pIter = taosRefCreateIter(tsWal.refId); while (pWal) {
if (walNeedFsync(pWal)) {
// while (taosRefIterNext(pIter)) { wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq);
// SWal *pWal = taosRefIterGet(pIter); int32_t code = fsync(pWal->fd);
// if (pWal == NULL) break; if (code != 0) {
wError("vgId:%d, wal:%s, fsync failed, reason:%s", pWal->vgId, pWal->name, strerror(code));
// if (!walNeedFsync(pWal)) { }
// wTrace("wal:%s, do fsync, level:%d seq:%d rseq:%d", pWal->name, pWal->level, pWal->fsyncSeq, tsWal.refreshSeq); }
// code = walFsync(pWal); pWal = taosIterateRef(tsWal.refId, pWal);
// if (code != TSDB_CODE_SUCCESS) { }
// wError("wal:%s, fsync failed(%s)", pWal->name, strerror(code));
// }
// }
// taosReleaseRef(pWal);
// }
// taosRefDestroyIter(pIter);
} }
static void *walThreadFunc(void *param) { static void *walThreadFunc(void *param) {
@ -232,6 +226,7 @@ static int32_t walCreateThread() {
} }
static void walStopThread() { static void walStopThread() {
tsWal.stop = 1;
if (tsWal.thread) { if (tsWal.thread) {
pthread_join(tsWal.thread, NULL); pthread_join(tsWal.thread, NULL);
} }

View File

@ -23,77 +23,79 @@
#include "twal.h" #include "twal.h"
#include "walInt.h" #include "walInt.h"
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp); static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name);
static int32_t walRemoveWalFiles(const char *path); static int32_t walGetNextFile(SWal *pWal, int64_t lastFileId, int64_t *nexFileId, char *nextFileName);
int32_t walRenew(void *handle) { int32_t walRenew(void *handle) {
if (handle == NULL) return 0; if (handle == NULL) return 0;
SWal *pWal = handle;
terrno = 0; SWal * pWal = handle;
int32_t code = 0;
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
if (pWal->fd >= 0) { if (pWal->fd >= 0) {
close(pWal->fd); close(pWal->fd);
pWal->id++;
wDebug("vgId:%d, wal:%s, it is closed", pWal->vgId, pWal->name); wDebug("vgId:%d, wal:%s, it is closed", pWal->vgId, pWal->name);
} }
pWal->num++; uint64_t lastId = pWal->fileId;
if (pWal->keep) {
pWal->fileId = 0;
} else {
pWal->fileId = taosGetTimestampUs();
}
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id); snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, pWal->fileId);
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
if (pWal->fd < 0) { if (pWal->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal:%s, failed to open, reason:%s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, wal:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno));
} else { } else {
wDebug("vgId:%d, wal:%s, it is created", pWal->vgId, pWal->name); wDebug("vgId:%d, wal:%s, it is created", pWal->vgId, pWal->name);
if (pWal->num > pWal->max) {
// remove the oldest wal file
char name[TSDB_FILENAME_LEN * 3];
snprintf(name, sizeof(name), "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max);
if (remove(name) < 0) {
wError("vgId:%d, wal:%s, failed to remove(%s)", pWal->vgId, name, strerror(errno));
} else {
wDebug("vgId:%d, wal:%s, it is removed", pWal->vgId, name);
} }
pWal->num--; if (pWal->keep != 1 && lastId != -1) {
// remove last wal file
char name[TSDB_FILENAME_LEN + 20];
snprintf(name, sizeof(name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, lastId);
if (remove(name) < 0) {
wError("vgId:%d, wal:%s, failed to remove since %s", pWal->vgId, pWal->name, strerror(errno));
} else {
wDebug("vgId:%d, wal:%s, it is removed", pWal->vgId, pWal->name);
} }
} }
pthread_mutex_unlock(&pWal->mutex); pthread_mutex_unlock(&pWal->mutex);
return terrno; return code;
} }
int32_t walWrite(void *handle, SWalHead *pHead) { int32_t walWrite(void *handle, SWalHead *pHead) {
SWal *pWal = handle; if (handle == NULL) return -1;
if (pWal == NULL) return -1;
terrno = 0; SWal * pWal = handle;
int32_t code = 0;
// no wal // no wal
if (pWal->level == TAOS_WAL_NOLOG) return 0; if (pWal->level == TAOS_WAL_NOLOG) return 0;
if (pHead->version <= pWal->version) return 0; if (pHead->version <= pWal->version) return 0;
pHead->signature = walSignature; pHead->signature = WAL_SIGNATURE;
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
int32_t contLen = pHead->len + sizeof(SWalHead); int32_t contLen = pHead->len + sizeof(SWalHead);
if (taosTWrite(pWal->fd, pHead, contLen) != contLen) { if (taosTWrite(pWal->fd, pHead, contLen) != contLen) {
terrno = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal:%s, failed to write(%s)", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, wal:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno));
return terrno;
} else { } else {
pWal->version = pHead->version; pWal->version = pHead->version;
} }
ASSERT(contLen == pHead->len + sizeof(SWalHead)); ASSERT(contLen == pHead->len + sizeof(SWalHead));
return 0; return code;
} }
void walFsync(void *handle) { void walFsync(void *handle) {
@ -102,156 +104,116 @@ void walFsync(void *handle) {
if (pWal->fsyncPeriod == 0) { if (pWal->fsyncPeriod == 0) {
if (fsync(pWal->fd) < 0) { if (fsync(pWal->fd) < 0) {
wError("vgId:%d, wal:%s, fsync failed(%s)", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, wal:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno));
} }
} }
} }
int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *, int32_t)) { int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *, int32_t)) {
if (handle == NULL) return -1;
SWal * pWal = handle; SWal * pWal = handle;
int32_t count = 0; int32_t count = 0;
uint32_t maxId = 0;
uint32_t minId = -1;
uint32_t index = 0;
int32_t code = 0;
struct dirent *ent;
terrno = 0; DIR *dir = opendir(pWal->path);
int32_t plen = strlen(walPrefix);
char opath[TSDB_FILENAME_LEN + 5];
snprintf(opath, sizeof(opath), "%s", pWal->path);
DIR *dir = opendir(opath);
if (dir == NULL && errno == ENOENT) return 0; if (dir == NULL && errno == ENOENT) return 0;
if (dir == NULL) { if (dir == NULL) return TAOS_SYSTEM_ERROR(errno);
code = TAOS_SYSTEM_ERROR(errno);
return code;
}
struct dirent *ent;
while ((ent = readdir(dir)) != NULL) { while ((ent = readdir(dir)) != NULL) {
if (strncmp(ent->d_name, walPrefix, plen) == 0) { char *fileName = ent->d_name;
index = atol(ent->d_name + plen);
if (index > maxId) maxId = index; if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
if (index < minId) minId = index; uint64_t fileId = atoll(fileName + WAL_PREFIX_LEN);
if (fileId == pWal->fileId) continue;
wDebug("vgId:%d, wal:%s, will be restored", pWal->vgId, fileName);
int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, fileName);
if (code != TSDB_CODE_SUCCESS) continue;
wInfo("vgId:%d, wal:%s, restore success, remove this file", pWal->vgId, fileName);
remove(fileName);
count++; count++;
} }
} }
closedir(dir); closedir(dir);
pWal->fileIndex = maxId;
if (pWal->keep) {
if (count == 0) { if (count == 0) {
if (pWal->keep) terrno = walRenew(pWal); return walRenew(pWal);
return terrno;
}
if (count != (maxId - minId + 1)) {
wError("vgId:%d, wal:%s, messed up, count:%d max:%d min:%d", pWal->vgId, opath, count, maxId, minId);
terrno = TSDB_CODE_WAL_APP_ERROR;
} else {
wDebug("vgId:%d, wal:%s, %d files will be restored", pWal->vgId, opath, count);
for (index = minId; index <= maxId; ++index) {
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, index);
terrno = walRestoreWalFile(pWal, pVnode, writeFp);
if (terrno < 0) continue;
}
}
if (terrno == 0) {
if (pWal->keep == 0) {
terrno = walRemoveWalFiles(opath);
if (terrno == 0) {
if (remove(opath) < 0) {
wError("vgId:%d, wal:%s, failed to remove directory, reason:%s", pWal->vgId, opath, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
}
}
} else { } else {
// open the existing WAL file in append mode // open the existing WAL file in append mode
pWal->num = count; pWal->fileId = 0;
pWal->id = maxId; snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, maxId);
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
if (pWal->fd < 0) { if (pWal->fd < 0) {
wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
} }
} }
return terrno; return TSDB_CODE_SUCCESS;
} }
int32_t walGetWalFile(void *handle, char *name, uint32_t *index) { int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
SWal * pWal = handle; if (handle == NULL) return -1;
int32_t code = 1; SWal *pWal = handle;
int32_t first = 0;
name[0] = 0;
if (pWal == NULL || pWal->num == 0) return 0;
pthread_mutex_lock(&(pWal->mutex)); pthread_mutex_lock(&(pWal->mutex));
int32_t code = walGetNextFile(pWal, *fileId, fileId, fileName);
first = pWal->id + 1 - pWal->num; if (code == 0) {
if (*index == 0) *index = first; // set to first one code = (*fileId == pWal->fileId) ? 0 : 1;
if (*index < first && *index > pWal->id) {
code = -1; // index out of range
} else {
sprintf(name, "wal/%s%d", walPrefix, *index);
code = (*index == pWal->id) ? 0 : 1;
} }
pthread_mutex_unlock(&(pWal->mutex)); pthread_mutex_unlock(&(pWal->mutex));
return code; return code;
} }
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name) {
char * name = pWal->name; int32_t size = WAL_MAX_SIZE;
int32_t size = 1024 * 1024; // default 1M buffer size void * buffer = tmalloc(size);
terrno = 0;
char *buffer = malloc(size);
if (buffer == NULL) { if (buffer == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
return terrno; return TAOS_SYSTEM_ERROR(errno);
} }
SWalHead *pHead = (SWalHead *)buffer;
int32_t fd = open(name, O_RDWR); int32_t fd = open(name, O_RDWR);
if (fd < 0) { if (fd < 0) {
wError("vgId:%d, wal:%s, failed to open for restore(%s)", pWal->vgId, name, strerror(errno)); wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); tfree(buffer);
free(buffer); return TAOS_SYSTEM_ERROR(errno);
return terrno;
} }
wDebug("vgId:%d, wal:%s, start to restore", pWal->vgId, name); wDebug("vgId:%d, wal:%s, start to restore", pWal->vgId, name);
int32_t code = TSDB_CODE_SUCCESS;
size_t offset = 0; size_t offset = 0;
SWalHead *pHead = buffer;
while (1) { while (1) {
int32_t ret = taosTRead(fd, pHead, sizeof(SWalHead)); int32_t ret = taosTRead(fd, pHead, sizeof(SWalHead));
if (ret == 0) break; if (ret == 0) break;
if (ret < 0) { if (ret < 0) {
wError("vgId:%d, wal:%s, failed to read wal head part since %s", pWal->vgId, name, strerror(errno)); wError("vgId:%d, wal:%s, failed to read wal head part since %s", pWal->vgId, name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
break; break;
} }
if (ret < sizeof(SWalHead)) { if (ret < sizeof(SWalHead)) {
wError("vgId:%d, wal:%s, failed to read head, ret:%d, skip the rest of file", pWal->vgId, name, ret); wError("vgId:%d, wal:%s, failed to read wal head since %s, read size:%d, skip the rest of file", pWal->vgId,
name, strerror(errno), ret);
taosFtruncate(fd, offset); taosFtruncate(fd, offset);
fsync(fd); fsync(fd);
break; break;
} }
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wWarn("vgId:%d, wal:%s, cksum is messed up, skip the rest of file", pWal->vgId, name); wError("vgId:%d, wal:%s, wal head cksum is messed up, skip the rest of file", pWal->vgId, name);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; code = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(false); ASSERT(false);
break; break;
} }
@ -260,23 +222,24 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
size = sizeof(SWalHead) + pHead->len; size = sizeof(SWalHead) + pHead->len;
buffer = realloc(buffer, size); buffer = realloc(buffer, size);
if (buffer == NULL) { if (buffer == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno);
break; break;
} }
pHead = (SWalHead *)buffer; pHead = buffer;
} }
ret = taosTRead(fd, pHead->cont, pHead->len); ret = taosTRead(fd, pHead->cont, pHead->len);
if (ret < 0) { if (ret < 0) {
wError("vgId:%d, wal:%s failed to read wal body part since %s", pWal->vgId, name, strerror(errno)); wError("vgId:%d, wal:%s failed to read wal body part since %s", pWal->vgId, name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
break; break;
} }
if (ret < pHead->len) { if (ret < pHead->len) {
wError("vgId:%d, wal:%s, failed to read body, len:%d ret:%d, skip the rest of file", pWal->vgId, name, pHead->len, wError("vgId:%d, wal:%s, failed to read body since %s, read size:%d len:%d , skip the rest of file", pWal->vgId,
ret); name, strerror(errno), ret, pHead->len);
taosFtruncate(fd, offset); taosFtruncate(fd, offset);
fsync(fd); fsync(fd);
break; break;
@ -289,38 +252,9 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
} }
close(fd); close(fd);
free(buffer); tfree(buffer);
return terrno; return code;
}
static int32_t walRemoveWalFiles(const char *path) {
int32_t plen = strlen(walPrefix);
char name[TSDB_FILENAME_LEN * 3];
terrno = 0;
struct dirent *ent;
DIR *dir = opendir(path);
if (dir == NULL && errno == ENOENT) return 0;
if (dir == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
}
while ((ent = readdir(dir)) != NULL) {
if (strncmp(ent->d_name, walPrefix, plen) == 0) {
snprintf(name, sizeof(name), "%s/%s", path, ent->d_name);
if (remove(name) < 0) {
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
}
}
}
closedir(dir);
return terrno;
} }
int64_t walGetVersion(twalh param) { int64_t walGetVersion(twalh param) {
@ -329,3 +263,38 @@ int64_t walGetVersion(twalh param) {
return pWal->version; return pWal->version;
} }
static int32_t walGetNextFile(SWal *pWal, int64_t lastFileId, int64_t *nexFileId, char *nextFileName) {
int64_t nearFileId = INT64_MAX;
char nearFileName[WAL_FILE_LEN] = {0};
DIR *dir = opendir(pWal->path);
if (dir == NULL) {
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
return -1;
}
struct dirent *ent;
while ((ent = readdir(dir)) != NULL) {
char *fileName = ent->d_name;
if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
uint64_t fileId = atoll(fileName + WAL_PREFIX_LEN);
if (fileId <= lastFileId) continue;
if (fileId < nearFileId) {
nearFileId = fileId;
tstrncpy(nearFileName, fileName, WAL_FILE_LEN);
}
}
}
closedir(dir);
if (nearFileId == INT64_MAX) return -1;
*nexFileId = nearFileId;
tstrncpy(nextFileName, nearFileName, WAL_FILE_LEN);
wTrace("vgId:%d, path:%s, lastfile %" PRId64 ", nextfile is %s", pWal->vgId, pWal->path, lastFileId, nextFileName);
return 0;
}

View File

@ -115,17 +115,17 @@ int main(int argc, char *argv[]) {
printf("%d wal files are written\n", total); printf("%d wal files are written\n", total);
uint32_t index = 0; int64_t index = 0;
char name[256]; char name[256];
while (1) { while (1) {
int code = walGetWalFile(pWal, name, &index); int code = walGetWalFile(pWal, name, &index);
if (code == -1) { if (code == -1) {
printf("failed to get wal file, index:%d\n", index); printf("failed to get wal file, index:%" PRId64 "\n", index);
break; break;
} }
printf("index:%d wal:%s\n", index, name); printf("index:%" PRId64 " wal:%s\n", index, name);
if (code == 0) break; if (code == 0) break;
index++; index++;