From d0e60fdec1ce9de57027b51b3d2d3162b4de6c2e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 29 Oct 2020 12:17:56 +0000 Subject: [PATCH 01/12] TD-1856 TD-1847 --- src/inc/twal.h | 19 +- src/{wal/inc/walMgmt.h => util/inc/talloc.h} | 19 +- src/util/src/talloc.c | 66 ++++ src/wal/inc/walInt.h | 20 ++ src/wal/src/walMgmt.c | 221 +++++++++++- src/wal/src/walWrite.c | 351 +++---------------- 6 files changed, 387 insertions(+), 309 deletions(-) rename src/{wal/inc/walMgmt.h => util/inc/talloc.h} (59%) create mode 100644 src/util/src/talloc.c diff --git a/src/inc/twal.h b/src/inc/twal.h index 1ce7b132b0..626c4434e9 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -19,9 +19,11 @@ extern "C" { #endif -#define TAOS_WAL_NOLOG 0 -#define TAOS_WAL_WRITE 1 -#define TAOS_WAL_FSYNC 2 +typedef enum { + TAOS_WAL_NOLOG = 0, + TAOS_WAL_WRITE = 1, + TAOS_WAL_FSYNC = 2 +} EWalType; typedef struct { int8_t msgType; @@ -34,17 +36,22 @@ typedef struct { } SWalHead; typedef struct { - int8_t walLevel; // wal level + int32_t vgId; int32_t fsyncPeriod; // millisecond + int8_t walLevel; // wal level int8_t wals; // number of WAL files; int8_t keep; // keep the wal file when closed + int8_t reserved[5]; } SWalCfg; typedef void* twalh; // WAL HANDLE typedef int (*FWalWrite)(void *ahandle, void *pHead, int type); -twalh walOpen(const char *path, const SWalCfg *pCfg); -int walAlter(twalh pWal, const SWalCfg *pCfg); +int32_t walInit(); +void walCleanUp(); + +twalh walOpen(char *path, SWalCfg *pCfg); +int walAlter(twalh pWal, SWalCfg *pCfg); void walClose(twalh); int walRenew(twalh); int walWrite(twalh, SWalHead *); diff --git a/src/wal/inc/walMgmt.h b/src/util/inc/talloc.h similarity index 59% rename from src/wal/inc/walMgmt.h rename to src/util/inc/talloc.h index a23c7f8ec3..5f04f4b99c 100644 --- a/src/wal/inc/walMgmt.h +++ b/src/util/inc/talloc.h @@ -13,13 +13,28 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_WAL_MGMT_H -#define TDENGINE_WAL_MGMT_H +#ifndef TDENGINE_UTIL_ALLOC_H +#define TDENGINE_UTIL_ALLOC_H #ifdef __cplusplus extern "C" { #endif +#ifdef TSDB_USE_SYS_MEM + #define tmalloc(size) malloc(size) + #define tcalloc(size) calloc(1, size) + #define tmemalign(alignment, size) malloc(size) + #define tfree(p) free(p) + #define tmemzero(p, size) memset(p, 0, size) +#else + void *tmalloc(int32_t size); + void *tcalloc(int32_t size); + void *tmemalign(int32_t alignment, int32_t size); + void tfree(void *p); + void tmemzero(void *p, int32_t size); +#endif + + #ifdef __cplusplus } #endif diff --git a/src/util/src/talloc.c b/src/util/src/talloc.c new file mode 100644 index 0000000000..4f7067a26a --- /dev/null +++ b/src/util/src/talloc.c @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "tulog.h" + +#define TSDB_HAVE_MEMALIGN + +void *tmalloc(int32_t size) { + void *p = malloc(size); + if (p == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to malloc memory, size:%d reason:%s", size, strerror(errno)); + } + + return p; +} + +void *tcalloc(int32_t size) { + void *p = calloc(1, size); + if (p == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to calloc memory, size:%d reason:%s", size, strerror(errno)); + } + + return p; +} + +void tfree(void *p) { free(p); } + +void tmemzero(void *p, int32_t size) { memset(p, 0, size); } + +#ifdef TSDB_HAVE_MEMALIGN + +void *tmemalign(int32_t alignment, int32_t size) { + void *p; + + int err = posix_memalign(&p, alignment, size); + if (err) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to memalign memory, alignment:%d size:%d reason:%s", alignment, size, strerror(err)); + p = NULL; + } + + return p; +} + +#else + +void *tmemalign(int32_t alignment, int32_t size) { return tmalloc(size); } + +#endif diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index 593611589d..fc6955ab97 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -32,8 +32,28 @@ extern int32_t wDebugFlag; #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define walPrefix "wal" +#define walRefreshIntervalMs 1000 #define walSignature (uint32_t)(0xFAFBFDFE) +typedef struct { + uint64_t version; + int32_t vgId; + int32_t fd; + int32_t keep; + int32_t level; + int32_t fsyncPeriod; + int32_t fsyncSeq; + int32_t fileIndex; + void* timer; + void* signature; + int max; // maximum number of wal files + uint32_t id; // increase continuously + int num; // number of wal files + char path[TSDB_FILENAME_LEN]; + char name[TSDB_FILENAME_LEN + 16]; + pthread_mutex_t mutex; +} SWal; + #ifdef __cplusplus } #endif diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index 2dd094d860..ebd1328c1b 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -16,6 +16,225 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" +#include "talloc.h" +#include "tref.h" +#include "tutil.h" #include "twal.h" #include "walInt.h" -#include "walMgmt.h" \ No newline at end of file + +typedef struct { + int32_t refId; + int32_t num; + int32_t seq; + int8_t stop; + int8_t reserved[3]; + pthread_t thread; + pthread_mutex_t mutex; +} SWalMgmt; + +static SWalMgmt tsWal; +static int32_t walCreateThread(); +static void walStopThread(); +static int32_t walInitObj(SWal *pWal); +static void walFreeObj(void *pWal); + +int32_t walInit() { + tmemzero(&tsWal, sizeof(SWalMgmt)); + tsWal.refId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); + + int32_t code = walCreateThread(); + if (code != TSDB_CODE_SUCCESS) { + wError("failed to init wal module, reason:%s", tstrerror(code)); + return code; + } + + wInfo("wal module is initialized"); + return code; +} + +void walCleanUp() { + walStopThread(); + taosCloseRef(tsWal.refId); + wInfo("wal module is cleaned up"); +} + +void *walOpen(char *path, SWalCfg *pCfg) { + SWal *pWal = tcalloc(sizeof(SWal)); + if (pWal == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } + + pWal->fd = -1; + pWal->max = pCfg->wals; + pWal->id = 0; + pWal->num = 0; + pWal->level = pCfg->walLevel; + pWal->keep = pCfg->keep; + pWal->fsyncPeriod = pCfg->fsyncPeriod; + pWal->signature = pWal; + tstrncpy(pWal->path, path, sizeof(path)); + pthread_mutex_init(&pWal->mutex, NULL); + + pWal->fsyncSeq = pCfg->fsyncPeriod % 1000; + if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; + + if (walInitObj(pWal) != TSDB_CODE_SUCCESS) { + walFreeObj(pWal); + return NULL; + } + + if (taosAddRef(tsWal.refId, pWal) != TSDB_CODE_SUCCESS) { + walFreeObj(pWal); + return NULL; + } + + atomic_add_fetch_32(&tsWal.num, 1); + wDebug("vgId:%d, wal is opened, level:%d period:%d path:%s", pWal->vgId, pWal->level, pWal->fsyncPeriod, pWal->path); + + return pWal; +} + +int32_t walAlter(void *handle, SWalCfg *pCfg) { + SWal *pWal = handle; + + if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) { + wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->vgId, pWal->level, + pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod); + return TSDB_CODE_SUCCESS; + } + + wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->vgId, pWal->level, + pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod); + + pWal->level = pCfg->walLevel; + pWal->fsyncPeriod = pCfg->fsyncPeriod; + + return TSDB_CODE_SUCCESS; +} + +void walClose(void *handle) { + SWal *pWal = handle; + taosClose(pWal->fd); + + if (pWal->keep == 0) { + // remove all files in the directory + for (int32_t i = 0; i < pWal->num; ++i) { + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id - i); + if (remove(pWal->name) < 0) { + wError("vgId:%d, wal:%s, failed to remove", pWal->vgId, pWal->name); + } else { + wDebug("vgId:%d, wal:%s, it is removed", pWal->vgId, pWal->name); + } + } + } else { + wDebug("vgId:%d, wal:%s, it is closed and kept", pWal->vgId, pWal->name); + } + + taosRemoveRef(tsWal.refId, pWal); +} + +static int32_t walInitObj(SWal *pWal) { + if (taosMkDir(pWal->path, 0755) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal:%s, failed to create directory, reason:%s", pWal->vgId, pWal->path, strerror(errno)); + return terrno; + } + + if (pWal->keep == 1) { + return TSDB_CODE_SUCCESS; + } + + if (pWal && pWal->fd < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->path, strerror(errno)); + return terrno; + } + + wDebug("vgId:%d, wal:%s, is initialized", pWal->vgId, pWal->name); + return TSDB_CODE_SUCCESS; +} + +static void walFreeObj(void *wal) { + SWal *pWal = pWal; + wDebug("vgId:%d, wal is freed", pWal->vgId); + + taosClose(pWal->fd); + pthread_mutex_destroy(&pWal->mutex); + tfree(pWal); +} + +// static bool walNeedFsync(SWal *pWal) { +// if (pWal->fsyncPeriod <= 0 || pWal->level != TAOS_WAL_FSYNC) { +// return false; +// } + +// if (tsWal.seq % pWal->fsyncSeq == 0) { +// return true; +// } + +// return false; +// } + +static void walUpdateSeq() { + taosMsleep(walRefreshIntervalMs); + if (++tsWal.seq <= 0) { + tsWal.seq = 1; + } +} + +static void walFsyncAll() { + // int32_t code; + // void * pIter = taosRefCreateIter(tsWal.refId); + + // while (taosRefIterNext(pIter)) { + // SWal *pWal = taosRefIterGet(pIter); + // if (pWal == NULL) break; + + // if (!walNeedFsync(pWal)) { + // wTrace("wal:%s, do fsync, level:%d seq:%d rseq:%d", pWal->name, pWal->level, pWal->fsyncSeq, tsWal.refreshSeq); + // code = walFsync(pWal); + // if (code != TSDB_CODE_SUCCESS) { + // wError("wal:%s, fsync failed(%s)", pWal->name, strerror(code)); + // } + // } + + // taosReleaseRef(pWal); + // } + + // taosRefDestroyIter(pIter); +} + +static void *walThreadFunc(void *param) { + while (1) { + walUpdateSeq(); + walFsyncAll(); + if (tsWal.stop) break; + } + + return NULL; +} + +static int32_t walCreateThread() { + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) { + wError("failed to create wal thread, reason:%s", strerror(errno)); + return TAOS_SYSTEM_ERROR(errno); + } + + pthread_attr_destroy(&thAttr); + wDebug("wal thread is launched"); + + return TSDB_CODE_SUCCESS; +} + +static void walStopThread() { + if (tsWal.thread) { + pthread_join(tsWal.thread, NULL); + } + + wDebug("wal thread is stopped"); +} diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 95587caa14..2eea80e20a 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -15,171 +15,18 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "twal.h" -#include "walInt.h" -#include "walMgmt.h" +#include "talloc.h" +#include "taoserror.h" #include "tchecksum.h" #include "tutil.h" -#include "ttimer.h" -#include "taoserror.h" -#include "twal.h" #include "tqueue.h" +#include "twal.h" +#include "walInt.h" +static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp); +static int32_t walRemoveWalFiles(const char *path); -typedef struct { - uint64_t version; - int fd; - int keep; - int level; - int32_t fsyncPeriod; - void *timer; - void *signature; - int max; // maximum number of wal files - uint32_t id; // increase continuously - int num; // number of wal files - char path[TSDB_FILENAME_LEN]; - char name[TSDB_FILENAME_LEN+16]; - pthread_mutex_t mutex; -} SWal; - -static void *walTmrCtrl = NULL; -static int tsWalNum = 0; -static pthread_once_t walModuleInit = PTHREAD_ONCE_INIT; -static int walHandleExistingFiles(const char *path); -static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp); -static int walRemoveWalFiles(const char *path); -static void walProcessFsyncTimer(void *param, void *tmrId); -static void walRelease(SWal *pWal); -static int walGetMaxOldFileId(char *odir); - -static void walModuleInitFunc() { - walTmrCtrl = taosTmrInit(1000, 100, 300000, "WAL"); - if (walTmrCtrl == NULL) - walModuleInit = PTHREAD_ONCE_INIT; - else - wDebug("WAL module is initialized"); -} - -static inline bool walNeedFsyncTimer(SWal *pWal) { - if (pWal->fsyncPeriod > 0 && pWal->level == TAOS_WAL_FSYNC) { - return true; - } - return false; -} - -void *walOpen(const char *path, const SWalCfg *pCfg) { - SWal *pWal = calloc(sizeof(SWal), 1); - if (pWal == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; - } - - pthread_once(&walModuleInit, walModuleInitFunc); - if (walTmrCtrl == NULL) { - free(pWal); - terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; - } - - atomic_add_fetch_32(&tsWalNum, 1); - pWal->fd = -1; - pWal->max = pCfg->wals; - pWal->id = 0; - pWal->num = 0; - pWal->level = pCfg->walLevel; - pWal->keep = pCfg->keep; - pWal->fsyncPeriod = pCfg->fsyncPeriod; - pWal->signature = pWal; - tstrncpy(pWal->path, path, sizeof(pWal->path)); - pthread_mutex_init(&pWal->mutex, NULL); - - if (walNeedFsyncTimer(pWal)) { - pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl); - if (pWal->timer == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - walRelease(pWal); - return NULL; - } - } - - if (taosMkDir(path, 0755) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - wError("wal:%s, failed to create directory(%s)", path, strerror(errno)); - walRelease(pWal); - pWal = NULL; - } - - if (pCfg->keep == 1) return pWal; - - if (walHandleExistingFiles(path) == 0) walRenew(pWal); - - if (pWal && pWal->fd < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - wError("wal:%s, failed to open(%s)", path, strerror(errno)); - walRelease(pWal); - pWal = NULL; - } - - if (pWal) wDebug("wal:%s, it is open, level:%d fsyncPeriod:%d", path, pWal->level, pWal->fsyncPeriod); - return pWal; -} - -int walAlter(twalh wal, const SWalCfg *pCfg) { - SWal *pWal = wal; - if (pWal == NULL) { - return TSDB_CODE_WAL_APP_ERROR; - } - - if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) { - wDebug("wal:%s, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->name, pWal->level, - pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod); - return TSDB_CODE_SUCCESS; - } - - wInfo("wal:%s, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->name, pWal->level, pWal->fsyncPeriod, - pCfg->walLevel, pCfg->fsyncPeriod); - - pthread_mutex_lock(&pWal->mutex); - pWal->level = pCfg->walLevel; - pWal->fsyncPeriod = pCfg->fsyncPeriod; - if (walNeedFsyncTimer(pWal)) { - wInfo("wal:%s, reset fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod); - taosTmrReset(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, &pWal->timer, walTmrCtrl); - } else { - wInfo("wal:%s, stop fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod); - taosTmrStop(pWal->timer); - pWal->timer = NULL; - } - pthread_mutex_unlock(&pWal->mutex); - - return TSDB_CODE_SUCCESS; -} - -void walClose(void *handle) { - if (handle == NULL) return; - - SWal *pWal = handle; - taosClose(pWal->fd); - if (pWal->timer) taosTmrStopA(&pWal->timer); - - if (pWal->keep == 0) { - // remove all files in the directory - for (int i = 0; i < pWal->num; ++i) { - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id - i); - if (remove(pWal->name) < 0) { - wError("wal:%s, failed to remove", pWal->name); - } else { - wDebug("wal:%s, it is removed", pWal->name); - } - } - } else { - wDebug("wal:%s, it is closed and kept", pWal->name); - } - - walRelease(pWal); -} - -int walRenew(void *handle) { +int32_t walRenew(void *handle) { if (handle == NULL) return 0; SWal *pWal = handle; @@ -190,7 +37,7 @@ int walRenew(void *handle) { if (pWal->fd >= 0) { close(pWal->fd); pWal->id++; - wDebug("wal:%s, it is closed", pWal->name); + wDebug("vgId:%d, wal:%s, it is closed", pWal->vgId, pWal->name); } pWal->num++; @@ -199,19 +46,19 @@ int walRenew(void *handle) { pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); if (pWal->fd < 0) { - wError("wal:%s, failed to open(%s)", pWal->name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal:%s, failed to open, reason:%s", pWal->vgId, pWal->name, strerror(errno)); } else { - wDebug("wal:%s, it is created", pWal->name); + wDebug("vgId:%d, wal:%s, it is created", pWal->vgId, pWal->name); if (pWal->num > pWal->max) { // remove the oldest wal file char name[TSDB_FILENAME_LEN * 3]; snprintf(name, sizeof(name), "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max); if (remove(name) < 0) { - wError("wal:%s, failed to remove(%s)", name, strerror(errno)); + wError("vgId:%d, wal:%s, failed to remove(%s)", pWal->vgId, name, strerror(errno)); } else { - wDebug("wal:%s, it is removed", name); + wDebug("vgId:%d, wal:%s, it is removed", pWal->vgId, name); } pWal->num--; @@ -223,7 +70,7 @@ int walRenew(void *handle) { return terrno; } -int walWrite(void *handle, SWalHead *pHead) { +int32_t walWrite(void *handle, SWalHead *pHead) { SWal *pWal = handle; if (pWal == NULL) return -1; @@ -235,11 +82,11 @@ int walWrite(void *handle, SWalHead *pHead) { pHead->signature = walSignature; taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); - int contLen = pHead->len + sizeof(SWalHead); + int32_t contLen = pHead->len + sizeof(SWalHead); if (taosTWrite(pWal->fd, pHead, contLen) != contLen) { - wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal:%s, failed to write(%s)", pWal->vgId, pWal->name, strerror(errno)); return terrno; } else { pWal->version = pHead->version; @@ -255,29 +102,30 @@ void walFsync(void *handle) { if (pWal->fsyncPeriod == 0) { if (fsync(pWal->fd) < 0) { - wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno)); + wError("vgId:%d, wal:%s, fsync failed(%s)", pWal->vgId, pWal->name, strerror(errno)); } } } -int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) { - SWal *pWal = handle; - struct dirent *ent; - int count = 0; - uint32_t maxId = 0, minId = -1, index =0; +int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *, int32_t)) { + SWal * pWal = handle; + int32_t count = 0; + uint32_t maxId = 0; + uint32_t minId = -1; + uint32_t index = 0; + int32_t code = 0; + struct dirent *ent; terrno = 0; - int plen = strlen(walPrefix); + int32_t plen = strlen(walPrefix); char opath[TSDB_FILENAME_LEN + 5]; - - int slen = snprintf(opath, sizeof(opath), "%s", pWal->path); - if (pWal->keep == 0) strcpy(opath + slen, "/old"); + snprintf(opath, sizeof(opath), "%s", pWal->path); DIR *dir = opendir(opath); if (dir == NULL && errno == ENOENT) return 0; if (dir == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return terrno; + code = TAOS_SYSTEM_ERROR(errno); + return code; } while ((ent = readdir(dir)) != NULL) { @@ -290,6 +138,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) } closedir(dir); + pWal->fileIndex = maxId; if (count == 0) { if (pWal->keep) terrno = walRenew(pWal); @@ -297,10 +146,10 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) } if (count != (maxId - minId + 1)) { - wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId); + wError("vgId:%d, wal:%s, messed up, count:%d max:%d min:%d", pWal->vgId, opath, count, maxId, minId); terrno = TSDB_CODE_WAL_APP_ERROR; } else { - wDebug("wal:%s, %d files will be restored", opath, count); + wDebug("vgId:%d, wal:%s, %d files will be restored", pWal->vgId, opath, count); for (index = minId; index <= maxId; ++index) { snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, index); @@ -314,7 +163,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) terrno = walRemoveWalFiles(opath); if (terrno == 0) { if (remove(opath) < 0) { - wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno)); + wError("vgId:%d, wal:%s, failed to remove directory, reason:%s", pWal->vgId, opath, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); } } @@ -325,7 +174,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, maxId); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); if (pWal->fd < 0) { - wError("wal:%s, failed to open file(%s)", pWal->name, strerror(errno)); + wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); } } @@ -334,9 +183,9 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) return terrno; } -int walGetWalFile(void *handle, char *name, uint32_t *index) { +int32_t walGetWalFile(void *handle, char *name, uint32_t *index) { SWal * pWal = handle; - int code = 1; + int32_t code = 1; int32_t first = 0; name[0] = 0; @@ -359,22 +208,9 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) { return code; } -static void walRelease(SWal *pWal) { - pthread_mutex_destroy(&pWal->mutex); - pWal->signature = NULL; - free(pWal); - - if (atomic_sub_fetch_32(&tsWalNum, 1) == 0) { - if (walTmrCtrl) taosTmrCleanUp(walTmrCtrl); - walTmrCtrl = NULL; - walModuleInit = PTHREAD_ONCE_INIT; - wDebug("WAL module is cleaned up"); - } -} - -static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { - char *name = pWal->name; - int size = 1024 * 1024; // default 1M buffer size +static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { + char * name = pWal->name; + int32_t size = 1024 * 1024; // default 1M buffer size terrno = 0; char *buffer = malloc(size); @@ -385,36 +221,36 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { SWalHead *pHead = (SWalHead *)buffer; - int fd = open(name, O_RDWR); + int32_t fd = open(name, O_RDWR); if (fd < 0) { - wError("wal:%s, failed to open for restore(%s)", name, strerror(errno)); + wError("vgId:%d, wal:%s, failed to open for restore(%s)", pWal->vgId, name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); free(buffer); return terrno; } - wDebug("wal:%s, start to restore", name); + wDebug("vgId:%d, wal:%s, start to restore", pWal->vgId, name); size_t offset = 0; while (1) { - int ret = taosTRead(fd, pHead, sizeof(SWalHead)); + int32_t ret = taosTRead(fd, pHead, sizeof(SWalHead)); if (ret == 0) break; if (ret < 0) { - wError("wal:%s, failed to read wal head part since %s", name, strerror(errno)); + wError("vgId:%d, wal:%s, failed to read wal head part since %s", pWal->vgId, name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); break; } if (ret < sizeof(SWalHead)) { - wError("wal:%s, failed to read head, ret:%d, skip the rest of file", name, ret); + wError("vgId:%d, wal:%s, failed to read head, ret:%d, skip the rest of file", pWal->vgId, name, ret); taosFtruncate(fd, offset); fsync(fd); break; } if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { - wWarn("wal:%s, cksum is messed up, skip the rest of file", name); + wWarn("vgId:%d, wal:%s, cksum is messed up, skip the rest of file", pWal->vgId, name); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(false); break; @@ -433,13 +269,14 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { ret = taosTRead(fd, pHead->cont, pHead->len); if (ret < 0) { - wError("wal:%s failed to read wal body part since %s", name, strerror(errno)); + wError("vgId:%d, wal:%s failed to read wal body part since %s", pWal->vgId, name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); break; } if (ret < pHead->len) { - wError("wal:%s, failed to read body, len:%d ret:%d, skip the rest of file", name, pHead->len, ret); + wError("vgId:%d, wal:%s, failed to read body, len:%d ret:%d, skip the rest of file", pWal->vgId, name, pHead->len, + ret); taosFtruncate(fd, offset); fsync(fd); break; @@ -457,50 +294,9 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { return terrno; } -int walHandleExistingFiles(const char *path) { - char oname[TSDB_FILENAME_LEN * 3]; - char nname[TSDB_FILENAME_LEN * 3]; - char opath[TSDB_FILENAME_LEN]; - - snprintf(opath, sizeof(opath), "%s/old", path); - - struct dirent *ent; - DIR *dir = opendir(path); - int plen = strlen(walPrefix); - terrno = 0; - - int midx = walGetMaxOldFileId(opath); - int count = 0; - while ((ent = readdir(dir)) != NULL) { - if (strncmp(ent->d_name, walPrefix, plen) == 0) { - midx++; - snprintf(oname, sizeof(oname), "%s/%s", path, ent->d_name); - snprintf(nname, sizeof(nname), "%s/old/wal%d", path, midx); - if (taosMkDir(opath, 0755) != 0) { - wError("wal:%s, failed to create directory:%s(%s)", oname, opath, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - break; - } - - if (rename(oname, nname) < 0) { - wError("wal:%s, failed to move to new:%s", oname, nname); - terrno = TAOS_SYSTEM_ERROR(errno); - break; - } - - count++; - } - - wDebug("wal:%s, %d files are moved for restoration", path, count); - } - - closedir(dir); - return terrno; -} - -static int walRemoveWalFiles(const char *path) { - int plen = strlen(walPrefix); - char name[TSDB_FILENAME_LEN * 3]; +static int32_t walRemoveWalFiles(const char *path) { + int32_t plen = strlen(walPrefix); + char name[TSDB_FILENAME_LEN * 3]; terrno = 0; @@ -527,54 +323,9 @@ static int walRemoveWalFiles(const char *path) { return terrno; } -static void walProcessFsyncTimer(void *param, void *tmrId) { - SWal *pWal = param; - - if (pWal->signature != pWal) return; - if (pWal->fd < 0) return; - - if (fsync(pWal->fd) < 0) { - wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno)); - } - - if (walNeedFsyncTimer(pWal)) { - pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl); - } else { - wInfo("wal:%s, stop fsync timer for walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod); - taosTmrStop(pWal->timer); - pWal->timer = NULL; - } -} - int64_t walGetVersion(twalh param) { SWal *pWal = param; if (pWal == 0) return 0; return pWal->version; } - -static int walGetMaxOldFileId(char *odir) { - int midx = 0; - DIR * dir = NULL; - struct dirent *dp = NULL; - int plen = strlen(walPrefix); - - if (access(odir, F_OK) != 0) return midx; - - dir = opendir(odir); - if (dir == NULL) { - wError("failed to open directory %s since %s", odir, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - while ((dp = readdir(dir)) != NULL) { - if (strncmp(dp->d_name, walPrefix, plen) == 0) { - int idx = atol(dp->d_name + plen); - if (midx < idx) midx = idx; - } - } - - closedir(dir); - return midx; -} \ No newline at end of file From d4fbd7bbab95c3b5f9e85b82d492813b74957ff0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 30 Oct 2020 18:08:48 +0800 Subject: [PATCH 02/12] TD-1846 --- src/dnode/src/dnodeMain.c | 2 + src/inc/tsync.h | 2 +- src/inc/twal.h | 10 +- src/mnode/src/mnodeSdb.c | 4 +- src/sync/src/syncRetrieve.c | 4 +- src/sync/test/syncServer.c | 2 +- src/util/inc/talloc.h | 3 +- src/util/src/talloc.c | 10 ++ src/vnode/src/vnodeMain.c | 5 +- src/wal/inc/walInt.h | 21 ++- src/wal/src/walMgmt.c | 91 ++++++------ src/wal/src/walWrite.c | 281 ++++++++++++++++-------------------- src/wal/test/waltest.c | 6 +- 13 files changed, 209 insertions(+), 232 deletions(-) diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 97e6f2ce6d..c46099bd14 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -19,6 +19,7 @@ #include "tutil.h" #include "tconfig.h" #include "tglobal.h" +#include "twal.h" #include "dnode.h" #include "dnodeInt.h" #include "dnodeMgmt.h" @@ -50,6 +51,7 @@ typedef struct { static const SDnodeComponent tsDnodeComponents[] = { {"storage", dnodeInitStorage, dnodeCleanupStorage}, + {"wal", walInit, walCleanUp}, {"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!! {"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, {"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite}, diff --git a/src/inc/tsync.h b/src/inc/tsync.h index ca0f70d104..11b81f9379 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -68,7 +68,7 @@ typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uin // get the wal file from index or after // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file -typedef int (*FGetWalInfo)(void *ahandle, char *name, uint32_t *index); +typedef int (*FGetWalInfo)(void *ahandle, char *name, int64_t *index); // when a forward pkt is received, call this to handle data typedef int (*FWriteToCache)(void *ahandle, void *pHead, int type); diff --git a/src/inc/twal.h b/src/inc/twal.h index 626c4434e9..b87831381d 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -51,13 +51,13 @@ int32_t walInit(); void walCleanUp(); twalh walOpen(char *path, SWalCfg *pCfg); -int walAlter(twalh pWal, SWalCfg *pCfg); +int32_t walAlter(twalh pWal, SWalCfg *pCfg); void walClose(twalh); -int walRenew(twalh); -int walWrite(twalh, SWalHead *); +int32_t walRenew(twalh); +int32_t walWrite(twalh, SWalHead *); void walFsync(twalh); -int walRestore(twalh, void *pVnode, FWalWrite writeFp); -int walGetWalFile(twalh, char *name, uint32_t *index); +int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp); +int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId); int64_t walGetVersion(twalh); #ifdef __cplusplus diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 8c61c61a10..a04c161599 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -175,7 +175,7 @@ static void *sdbGetTableFromId(int32_t tableId) { } static int32_t sdbInitWal() { - SWalCfg walCfg = {.walLevel = 2, .wals = 2, .keep = 1, .fsyncPeriod = 0}; + SWalCfg walCfg = {.vgId = 1, .walLevel = 2, .wals = 2, .keep = 1, .fsyncPeriod = 0}; char temp[TSDB_FILENAME_LEN]; sprintf(temp, "%s/wal", tsMnodeDir); tsSdbObj.wal = walOpen(temp, &walCfg); @@ -237,7 +237,7 @@ static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint3 return 0; } -static int sdbGetWalInfo(void *ahandle, char *name, uint32_t *index) { +static int sdbGetWalInfo(void *ahandle, char *name, int64_t *index) { return walGetWalFile(tsSdbObj.wal, name, index); } diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 60625d75ec..0137794d18 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -287,7 +287,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, return -1; } -static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) { +static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) { SSyncNode *pNode = pPeer->pSyncNode; int code = -1; char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file @@ -377,7 +377,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) { int32_t size; struct stat fstat; int code = -1; - uint32_t index = 0; + int64_t index = 0; while (1) { // retrieve wal info diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c index 380b971fa8..9ae45b25e3 100644 --- a/src/sync/test/syncServer.c +++ b/src/sync/test/syncServer.c @@ -254,7 +254,7 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex return magic; } -int getWalInfo(void *ahandle, char *name, uint32_t *index) { +int getWalInfo(void *ahandle, char *name, uint64_t *index) { struct stat fstat; char aname[280]; diff --git a/src/util/inc/talloc.h b/src/util/inc/talloc.h index 5f04f4b99c..a59e88cf13 100644 --- a/src/util/inc/talloc.h +++ b/src/util/inc/talloc.h @@ -23,18 +23,19 @@ extern "C" { #ifdef TSDB_USE_SYS_MEM #define tmalloc(size) malloc(size) #define tcalloc(size) calloc(1, size) + #define trealloc(p, size) realloc(p, size) #define tmemalign(alignment, size) malloc(size) #define tfree(p) free(p) #define tmemzero(p, size) memset(p, 0, size) #else void *tmalloc(int32_t size); void *tcalloc(int32_t size); + void *trealloc(void *p, int32_t size); void *tmemalign(int32_t alignment, int32_t size); void tfree(void *p); void tmemzero(void *p, int32_t size); #endif - #ifdef __cplusplus } #endif diff --git a/src/util/src/talloc.c b/src/util/src/talloc.c index 4f7067a26a..6f23b39393 100644 --- a/src/util/src/talloc.c +++ b/src/util/src/talloc.c @@ -40,6 +40,16 @@ void *tcalloc(int32_t size) { return p; } +void *trealloc(void *p, int32_t size) { + p = realloc(p, size); + if (p == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to realloc memory, size:%d reason:%s", size, strerror(errno)); + } + + return p; +} + void tfree(void *p) { free(p); } void tmemzero(void *p, int32_t size) { memset(p, 0, size); } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 9c283dd9c4..7308027817 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -42,7 +42,7 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode); static int32_t vnodeReadVersion(SVnodeObj *pVnode); static int vnodeProcessTsdbStatus(void *arg, int status); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); -static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); +static int vnodeGetWalInfo(void *ahandle, char *name, int64_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeCtrlFlow(void *handle, int32_t mseconds); static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); @@ -304,6 +304,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { sprintf(temp, "%s/wal", rootDir); pVnode->wal = walOpen(temp, &pVnode->walCfg); + pVnode->walCfg.vgId = pVnode->vgId; if (pVnode->wal == NULL) { vnodeCleanUp(pVnode); return terrno; @@ -621,7 +622,7 @@ static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uin return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size); } -static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) { +static int vnodeGetWalInfo(void *ahandle, char *name, int64_t *index) { SVnodeObj *pVnode = ahandle; return walGetWalFile(pVnode->wal, name, index); } diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index fc6955ab97..3c34c68f83 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -31,9 +31,13 @@ extern int32_t wDebugFlag; #define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} -#define walPrefix "wal" -#define walRefreshIntervalMs 1000 -#define walSignature (uint32_t)(0xFAFBFDFE) +#define WAL_PREFIX "wal" +#define WAL_PREFIX_LEN 4 +#define WAL_REFRESH_MS 1000 +#define WAL_MAX_SIZE (1024 * 1024) +#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE)) +#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) +#define WAL_FILE_LEN (TSDB_FILENAME_LEN + 32) typedef struct { uint64_t version; @@ -43,14 +47,9 @@ typedef struct { int32_t level; int32_t fsyncPeriod; int32_t fsyncSeq; - int32_t fileIndex; - void* timer; - void* signature; - int max; // maximum number of wal files - uint32_t id; // increase continuously - int num; // number of wal files - char path[TSDB_FILENAME_LEN]; - char name[TSDB_FILENAME_LEN + 16]; + int64_t fileId; + char path[WAL_PATH_LEN]; + char name[WAL_FILE_LEN]; pthread_mutex_t mutex; } SWal; diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index ebd1328c1b..b1e32496cc 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -27,7 +27,6 @@ typedef struct { int32_t num; int32_t seq; int8_t stop; - int8_t reserved[3]; pthread_t thread; pthread_mutex_t mutex; } SWalMgmt; @@ -48,7 +47,7 @@ int32_t walInit() { return code; } - wInfo("wal module is initialized"); + wInfo("wal module is initialized, refId:%d", tsWal.refId); return code; } @@ -65,15 +64,13 @@ void *walOpen(char *path, SWalCfg *pCfg) { return NULL; } + pWal->vgId = pCfg->vgId; pWal->fd = -1; - pWal->max = pCfg->wals; - pWal->id = 0; - pWal->num = 0; + pWal->fileId = -1; pWal->level = pCfg->walLevel; pWal->keep = pCfg->keep; pWal->fsyncPeriod = pCfg->fsyncPeriod; - pWal->signature = pWal; - tstrncpy(pWal->path, path, sizeof(path)); + tstrncpy(pWal->path, path, sizeof(pWal->path)); pthread_mutex_init(&pWal->mutex, NULL); pWal->fsyncSeq = pCfg->fsyncPeriod % 1000; @@ -90,12 +87,14 @@ void *walOpen(char *path, SWalCfg *pCfg) { } atomic_add_fetch_32(&tsWal.num, 1); - wDebug("vgId:%d, wal is opened, level:%d period:%d path:%s", pWal->vgId, pWal->level, pWal->fsyncPeriod, pWal->path); + wDebug("vgId:%d, wal:%p is opened, level:%d period:%d path:%s", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod, + pWal->path); return pWal; } int32_t walAlter(void *handle, SWalCfg *pCfg) { + if (handle == NULL) return TSDB_CODE_WAL_APP_ERROR; SWal *pWal = handle; if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) { @@ -109,26 +108,27 @@ int32_t walAlter(void *handle, SWalCfg *pCfg) { pWal->level = pCfg->walLevel; pWal->fsyncPeriod = pCfg->fsyncPeriod; + pWal->fsyncSeq = pCfg->fsyncPeriod % 1000; + if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; return TSDB_CODE_SUCCESS; } void walClose(void *handle) { + if (handle == NULL) return; + SWal *pWal = handle; taosClose(pWal->fd); if (pWal->keep == 0) { - // remove all files in the directory - for (int32_t i = 0; i < pWal->num; ++i) { - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id - i); - if (remove(pWal->name) < 0) { - wError("vgId:%d, wal:%s, failed to remove", pWal->vgId, pWal->name); - } else { - wDebug("vgId:%d, wal:%s, it is removed", pWal->vgId, pWal->name); - } + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, pWal->fileId); + if (remove(pWal->name) < 0) { + wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name); + } else { + wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name); } } else { - wDebug("vgId:%d, wal:%s, it is closed and kept", pWal->vgId, pWal->name); + wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name); } taosRemoveRef(tsWal.refId, pWal); @@ -145,6 +145,8 @@ static int32_t walInitObj(SWal *pWal) { return TSDB_CODE_SUCCESS; } + walRenew(pWal); + if (pWal && pWal->fd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->path, strerror(errno)); @@ -156,53 +158,45 @@ static int32_t walInitObj(SWal *pWal) { } static void walFreeObj(void *wal) { - SWal *pWal = pWal; - wDebug("vgId:%d, wal is freed", pWal->vgId); + SWal *pWal = wal; + wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); taosClose(pWal->fd); pthread_mutex_destroy(&pWal->mutex); tfree(pWal); } -// static bool walNeedFsync(SWal *pWal) { -// if (pWal->fsyncPeriod <= 0 || pWal->level != TAOS_WAL_FSYNC) { -// return false; -// } +static bool walNeedFsync(SWal *pWal) { + if (pWal->fsyncPeriod <= 0 || pWal->level != TAOS_WAL_FSYNC) { + return false; + } -// if (tsWal.seq % pWal->fsyncSeq == 0) { -// return true; -// } + if (tsWal.seq % pWal->fsyncSeq == 0) { + return true; + } -// return false; -// } + return false; +} static void walUpdateSeq() { - taosMsleep(walRefreshIntervalMs); + taosMsleep(WAL_REFRESH_MS); if (++tsWal.seq <= 0) { tsWal.seq = 1; } } static void walFsyncAll() { - // int32_t code; - // void * pIter = taosRefCreateIter(tsWal.refId); - - // while (taosRefIterNext(pIter)) { - // SWal *pWal = taosRefIterGet(pIter); - // if (pWal == NULL) break; - - // if (!walNeedFsync(pWal)) { - // wTrace("wal:%s, do fsync, level:%d seq:%d rseq:%d", pWal->name, pWal->level, pWal->fsyncSeq, tsWal.refreshSeq); - // code = walFsync(pWal); - // if (code != TSDB_CODE_SUCCESS) { - // wError("wal:%s, fsync failed(%s)", pWal->name, strerror(code)); - // } - // } - - // taosReleaseRef(pWal); - // } - - // taosRefDestroyIter(pIter); + SWal *pWal = taosIterateRef(tsWal.refId, NULL); + while (pWal) { + if (walNeedFsync(pWal)) { + wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); + int32_t code = fsync(pWal->fd); + if (code != 0) { + wError("vgId:%d, wal:%s, fsync failed, reason:%s", pWal->vgId, pWal->name, strerror(code)); + } + } + pWal = taosIterateRef(tsWal.refId, pWal); + } } static void *walThreadFunc(void *param) { @@ -232,6 +226,7 @@ static int32_t walCreateThread() { } static void walStopThread() { + tsWal.stop = 1; if (tsWal.thread) { pthread_join(tsWal.thread, NULL); } diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 2eea80e20a..6c3c97a0e9 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -23,77 +23,79 @@ #include "twal.h" #include "walInt.h" -static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp); -static int32_t walRemoveWalFiles(const char *path); +static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name); +static int32_t walGetNextFile(SWal *pWal, int64_t lastFileId, int64_t *nexFileId, char *nextFileName); int32_t walRenew(void *handle) { if (handle == NULL) return 0; - SWal *pWal = handle; - terrno = 0; + SWal * pWal = handle; + int32_t code = 0; pthread_mutex_lock(&pWal->mutex); if (pWal->fd >= 0) { close(pWal->fd); - pWal->id++; wDebug("vgId:%d, wal:%s, it is closed", pWal->vgId, pWal->name); } - pWal->num++; + uint64_t lastId = pWal->fileId; + if (pWal->keep) { + pWal->fileId = 0; + } else { + pWal->fileId = taosGetTimestampUs(); + } - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id); + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, pWal->fileId); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); if (pWal->fd < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal:%s, failed to open, reason:%s", pWal->vgId, pWal->name, strerror(errno)); + code = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); } else { wDebug("vgId:%d, wal:%s, it is created", pWal->vgId, pWal->name); + } - if (pWal->num > pWal->max) { - // remove the oldest wal file - char name[TSDB_FILENAME_LEN * 3]; - snprintf(name, sizeof(name), "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max); - if (remove(name) < 0) { - wError("vgId:%d, wal:%s, failed to remove(%s)", pWal->vgId, name, strerror(errno)); - } else { - wDebug("vgId:%d, wal:%s, it is removed", pWal->vgId, name); - } - - pWal->num--; + if (pWal->keep != 1 && lastId != -1) { + // remove last wal file + char name[TSDB_FILENAME_LEN + 20]; + snprintf(name, sizeof(name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, lastId); + if (remove(name) < 0) { + wError("vgId:%d, wal:%s, failed to remove since %s", pWal->vgId, pWal->name, strerror(errno)); + } else { + wDebug("vgId:%d, wal:%s, it is removed", pWal->vgId, pWal->name); } } pthread_mutex_unlock(&pWal->mutex); - return terrno; + return code; } int32_t walWrite(void *handle, SWalHead *pHead) { - SWal *pWal = handle; - if (pWal == NULL) return -1; + if (handle == NULL) return -1; - terrno = 0; + SWal * pWal = handle; + int32_t code = 0; // no wal if (pWal->level == TAOS_WAL_NOLOG) return 0; if (pHead->version <= pWal->version) return 0; - pHead->signature = walSignature; + pHead->signature = WAL_SIGNATURE; taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); int32_t contLen = pHead->len + sizeof(SWalHead); if (taosTWrite(pWal->fd, pHead, contLen) != contLen) { - terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal:%s, failed to write(%s)", pWal->vgId, pWal->name, strerror(errno)); - return terrno; + code = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno)); } else { pWal->version = pHead->version; } + ASSERT(contLen == pHead->len + sizeof(SWalHead)); - return 0; + return code; } void walFsync(void *handle) { @@ -102,156 +104,116 @@ void walFsync(void *handle) { if (pWal->fsyncPeriod == 0) { if (fsync(pWal->fd) < 0) { - wError("vgId:%d, wal:%s, fsync failed(%s)", pWal->vgId, pWal->name, strerror(errno)); + wError("vgId:%d, wal:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno)); } } } int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *, int32_t)) { - SWal * pWal = handle; - int32_t count = 0; - uint32_t maxId = 0; - uint32_t minId = -1; - uint32_t index = 0; - int32_t code = 0; - struct dirent *ent; + if (handle == NULL) return -1; - terrno = 0; - int32_t plen = strlen(walPrefix); - char opath[TSDB_FILENAME_LEN + 5]; - snprintf(opath, sizeof(opath), "%s", pWal->path); + SWal * pWal = handle; + int32_t count = 0; - DIR *dir = opendir(opath); + DIR *dir = opendir(pWal->path); if (dir == NULL && errno == ENOENT) return 0; - if (dir == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - return code; - } + if (dir == NULL) return TAOS_SYSTEM_ERROR(errno); + struct dirent *ent; while ((ent = readdir(dir)) != NULL) { - if (strncmp(ent->d_name, walPrefix, plen) == 0) { - index = atol(ent->d_name + plen); - if (index > maxId) maxId = index; - if (index < minId) minId = index; + char *fileName = ent->d_name; + + if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { + uint64_t fileId = atoll(fileName + WAL_PREFIX_LEN); + if (fileId == pWal->fileId) continue; + + wDebug("vgId:%d, wal:%s, will be restored", pWal->vgId, fileName); + + int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, fileName); + if (code != TSDB_CODE_SUCCESS) continue; + + wInfo("vgId:%d, wal:%s, restore success, remove this file", pWal->vgId, fileName); + remove(fileName); + count++; } } - closedir(dir); - pWal->fileIndex = maxId; - if (count == 0) { - if (pWal->keep) terrno = walRenew(pWal); - return terrno; - } - - if (count != (maxId - minId + 1)) { - wError("vgId:%d, wal:%s, messed up, count:%d max:%d min:%d", pWal->vgId, opath, count, maxId, minId); - terrno = TSDB_CODE_WAL_APP_ERROR; - } else { - wDebug("vgId:%d, wal:%s, %d files will be restored", pWal->vgId, opath, count); - - for (index = minId; index <= maxId; ++index) { - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, index); - terrno = walRestoreWalFile(pWal, pVnode, writeFp); - if (terrno < 0) continue; - } - } - - if (terrno == 0) { - if (pWal->keep == 0) { - terrno = walRemoveWalFiles(opath); - if (terrno == 0) { - if (remove(opath) < 0) { - wError("vgId:%d, wal:%s, failed to remove directory, reason:%s", pWal->vgId, opath, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - } - } + if (pWal->keep) { + if (count == 0) { + return walRenew(pWal); } else { // open the existing WAL file in append mode - pWal->num = count; - pWal->id = maxId; - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, maxId); + pWal->fileId = 0; + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); if (pWal->fd < 0) { wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->name, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + return TAOS_SYSTEM_ERROR(errno); } } } - return terrno; + return TSDB_CODE_SUCCESS; } -int32_t walGetWalFile(void *handle, char *name, uint32_t *index) { - SWal * pWal = handle; - int32_t code = 1; - int32_t first = 0; - - name[0] = 0; - if (pWal == NULL || pWal->num == 0) return 0; +int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { + if (handle == NULL) return -1; + SWal *pWal = handle; pthread_mutex_lock(&(pWal->mutex)); - - first = pWal->id + 1 - pWal->num; - if (*index == 0) *index = first; // set to first one - - if (*index < first && *index > pWal->id) { - code = -1; // index out of range - } else { - sprintf(name, "wal/%s%d", walPrefix, *index); - code = (*index == pWal->id) ? 0 : 1; + int32_t code = walGetNextFile(pWal, *fileId, fileId, fileName); + if (code == 0) { + code = (*fileId == pWal->fileId) ? 0 : 1; } - pthread_mutex_unlock(&(pWal->mutex)); return code; } -static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { - char * name = pWal->name; - int32_t size = 1024 * 1024; // default 1M buffer size - - terrno = 0; - char *buffer = malloc(size); +static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name) { + int32_t size = WAL_MAX_SIZE; + void * buffer = tmalloc(size); if (buffer == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return terrno; + wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); + return TAOS_SYSTEM_ERROR(errno); } - SWalHead *pHead = (SWalHead *)buffer; - int32_t fd = open(name, O_RDWR); if (fd < 0) { - wError("vgId:%d, wal:%s, failed to open for restore(%s)", pWal->vgId, name, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - free(buffer); - return terrno; + wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); + tfree(buffer); + return TAOS_SYSTEM_ERROR(errno); } wDebug("vgId:%d, wal:%s, start to restore", pWal->vgId, name); - size_t offset = 0; + int32_t code = TSDB_CODE_SUCCESS; + size_t offset = 0; + SWalHead *pHead = buffer; + while (1) { int32_t ret = taosTRead(fd, pHead, sizeof(SWalHead)); if (ret == 0) break; if (ret < 0) { wError("vgId:%d, wal:%s, failed to read wal head part since %s", pWal->vgId, name, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); break; } if (ret < sizeof(SWalHead)) { - wError("vgId:%d, wal:%s, failed to read head, ret:%d, skip the rest of file", pWal->vgId, name, ret); + wError("vgId:%d, wal:%s, failed to read wal head since %s, read size:%d, skip the rest of file", pWal->vgId, + name, strerror(errno), ret); taosFtruncate(fd, offset); fsync(fd); break; } if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { - wWarn("vgId:%d, wal:%s, cksum is messed up, skip the rest of file", pWal->vgId, name); - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + wError("vgId:%d, wal:%s, wal head cksum is messed up, skip the rest of file", pWal->vgId, name); + code = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(false); break; } @@ -260,23 +222,24 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { size = sizeof(SWalHead) + pHead->len; buffer = realloc(buffer, size); if (buffer == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); + code = TAOS_SYSTEM_ERROR(errno); break; } - pHead = (SWalHead *)buffer; + pHead = buffer; } ret = taosTRead(fd, pHead->cont, pHead->len); if (ret < 0) { wError("vgId:%d, wal:%s failed to read wal body part since %s", pWal->vgId, name, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); break; } if (ret < pHead->len) { - wError("vgId:%d, wal:%s, failed to read body, len:%d ret:%d, skip the rest of file", pWal->vgId, name, pHead->len, - ret); + wError("vgId:%d, wal:%s, failed to read body since %s, read size:%d len:%d , skip the rest of file", pWal->vgId, + name, strerror(errno), ret, pHead->len); taosFtruncate(fd, offset); fsync(fd); break; @@ -289,38 +252,9 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { } close(fd); - free(buffer); + tfree(buffer); - return terrno; -} - -static int32_t walRemoveWalFiles(const char *path) { - int32_t plen = strlen(walPrefix); - char name[TSDB_FILENAME_LEN * 3]; - - terrno = 0; - - struct dirent *ent; - DIR *dir = opendir(path); - if (dir == NULL && errno == ENOENT) return 0; - if (dir == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return terrno; - } - - while ((ent = readdir(dir)) != NULL) { - if (strncmp(ent->d_name, walPrefix, plen) == 0) { - snprintf(name, sizeof(name), "%s/%s", path, ent->d_name); - if (remove(name) < 0) { - wError("wal:%s, failed to remove(%s)", name, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - } - } - } - - closedir(dir); - - return terrno; + return code; } int64_t walGetVersion(twalh param) { @@ -329,3 +263,38 @@ int64_t walGetVersion(twalh param) { return pWal->version; } + +static int32_t walGetNextFile(SWal *pWal, int64_t lastFileId, int64_t *nexFileId, char *nextFileName) { + int64_t nearFileId = INT64_MAX; + char nearFileName[WAL_FILE_LEN] = {0}; + + DIR *dir = opendir(pWal->path); + if (dir == NULL) { + wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); + return -1; + } + + struct dirent *ent; + while ((ent = readdir(dir)) != NULL) { + char *fileName = ent->d_name; + + if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { + uint64_t fileId = atoll(fileName + WAL_PREFIX_LEN); + if (fileId <= lastFileId) continue; + + if (fileId < nearFileId) { + nearFileId = fileId; + tstrncpy(nearFileName, fileName, WAL_FILE_LEN); + } + } + } + closedir(dir); + + if (nearFileId == INT64_MAX) return -1; + + *nexFileId = nearFileId; + tstrncpy(nextFileName, nearFileName, WAL_FILE_LEN); + wTrace("vgId:%d, path:%s, lastfile %" PRId64 ", nextfile is %s", pWal->vgId, pWal->path, lastFileId, nextFileName); + + return 0; +} diff --git a/src/wal/test/waltest.c b/src/wal/test/waltest.c index bbee1347b8..ba0011af29 100644 --- a/src/wal/test/waltest.c +++ b/src/wal/test/waltest.c @@ -115,17 +115,17 @@ int main(int argc, char *argv[]) { printf("%d wal files are written\n", total); - uint32_t index = 0; + int64_t index = 0; char name[256]; while (1) { int code = walGetWalFile(pWal, name, &index); if (code == -1) { - printf("failed to get wal file, index:%d\n", index); + printf("failed to get wal file, index:%" PRId64 "\n", index); break; } - printf("index:%d wal:%s\n", index, name); + printf("index:%" PRId64 " wal:%s\n", index, name); if (code == 0) break; index++; From 8e1ddeeaf2cbec355e51e52e4114f2c381f35199 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 30 Oct 2020 19:25:04 +0800 Subject: [PATCH 03/12] TD-1846 --- src/vnode/src/vnodeMain.c | 2 +- src/wal/inc/walInt.h | 2 +- src/wal/src/walMgmt.c | 21 ++++++------ src/wal/src/walWrite.c | 67 +++++++++++++++++++++++---------------- 4 files changed, 52 insertions(+), 40 deletions(-) diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 7308027817..2d5ec98a80 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -303,8 +303,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { } sprintf(temp, "%s/wal", rootDir); - pVnode->wal = walOpen(temp, &pVnode->walCfg); pVnode->walCfg.vgId = pVnode->vgId; + pVnode->wal = walOpen(temp, &pVnode->walCfg); if (pVnode->wal == NULL) { vnodeCleanUp(pVnode); return terrno; diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index 3c34c68f83..8b5784d1ce 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -32,7 +32,7 @@ extern int32_t wDebugFlag; #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define WAL_PREFIX "wal" -#define WAL_PREFIX_LEN 4 +#define WAL_PREFIX_LEN 3 #define WAL_REFRESH_MS 1000 #define WAL_MAX_SIZE (1024 * 1024) #define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE)) diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index b1e32496cc..72c0ef21a2 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -43,7 +43,7 @@ int32_t walInit() { int32_t code = walCreateThread(); if (code != TSDB_CODE_SUCCESS) { - wError("failed to init wal module, reason:%s", tstrerror(code)); + wError("failed to init wal module since %s", tstrerror(code)); return code; } @@ -87,8 +87,7 @@ void *walOpen(char *path, SWalCfg *pCfg) { } atomic_add_fetch_32(&tsWal.num, 1); - wDebug("vgId:%d, wal:%p is opened, level:%d period:%d path:%s", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod, - pWal->path); + wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod); return pWal; } @@ -120,8 +119,8 @@ void walClose(void *handle) { SWal *pWal = handle; taosClose(pWal->fd); - if (pWal->keep == 0) { - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, pWal->fileId); + if (!pWal->keep) { + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); if (remove(pWal->name) < 0) { wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name); } else { @@ -137,11 +136,11 @@ void walClose(void *handle) { static int32_t walInitObj(SWal *pWal) { if (taosMkDir(pWal->path, 0755) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal:%s, failed to create directory, reason:%s", pWal->vgId, pWal->path, strerror(errno)); + wError("vgId:%d, file:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno)); return terrno; } - if (pWal->keep == 1) { + if (pWal->keep) { return TSDB_CODE_SUCCESS; } @@ -149,11 +148,11 @@ static int32_t walInitObj(SWal *pWal) { if (pWal && pWal->fd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->path, strerror(errno)); + wError("vgId:%d, file:%s, failed to open file since %s", pWal->vgId, pWal->path, strerror(errno)); return terrno; } - wDebug("vgId:%d, wal:%s, is initialized", pWal->vgId, pWal->name); + wDebug("vgId:%d, file is initialized", pWal->vgId); return TSDB_CODE_SUCCESS; } @@ -192,7 +191,7 @@ static void walFsyncAll() { wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); int32_t code = fsync(pWal->fd); if (code != 0) { - wError("vgId:%d, wal:%s, fsync failed, reason:%s", pWal->vgId, pWal->name, strerror(code)); + wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(code)); } } pWal = taosIterateRef(tsWal.refId, pWal); @@ -215,7 +214,7 @@ static int32_t walCreateThread() { pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) { - wError("failed to create wal thread, reason:%s", strerror(errno)); + wError("failed to create wal thread since %s", strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 6c3c97a0e9..45f00def06 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -36,34 +36,34 @@ int32_t walRenew(void *handle) { if (pWal->fd >= 0) { close(pWal->fd); - wDebug("vgId:%d, wal:%s, it is closed", pWal->vgId, pWal->name); + wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name); } - uint64_t lastId = pWal->fileId; + int64_t lastId = pWal->fileId; if (pWal->keep) { pWal->fileId = 0; } else { pWal->fileId = taosGetTimestampUs(); } - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, pWal->fileId); + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); if (pWal->fd < 0) { code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); + wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); } else { - wDebug("vgId:%d, wal:%s, it is created", pWal->vgId, pWal->name); + wDebug("vgId:%d, file:%s, it is created", pWal->vgId, pWal->name); } - if (pWal->keep != 1 && lastId != -1) { + if (!pWal->keep && lastId != -1) { // remove last wal file char name[TSDB_FILENAME_LEN + 20]; - snprintf(name, sizeof(name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, lastId); + snprintf(name, sizeof(name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, lastId); if (remove(name) < 0) { - wError("vgId:%d, wal:%s, failed to remove since %s", pWal->vgId, pWal->name, strerror(errno)); + wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, name, strerror(errno)); } else { - wDebug("vgId:%d, wal:%s, it is removed", pWal->vgId, pWal->name); + wDebug("vgId:%d, file:%s, it is removed", pWal->vgId, name); } } @@ -88,9 +88,10 @@ int32_t walWrite(void *handle, SWalHead *pHead) { if (taosTWrite(pWal->fd, pHead, contLen) != contLen) { code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno)); + wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno)); } else { pWal->version = pHead->version; + wTrace("vgId:%d, write version:%" PRId64 ", fileId:%" PRId64, pWal->vgId, pWal->version, pWal->fileId); } ASSERT(contLen == pHead->len + sizeof(SWalHead)); @@ -104,7 +105,7 @@ void walFsync(void *handle) { if (pWal->fsyncPeriod == 0) { if (fsync(pWal->fd) < 0) { - wError("vgId:%d, wal:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno)); + wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno)); } } } @@ -124,16 +125,23 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void * char *fileName = ent->d_name; if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { - uint64_t fileId = atoll(fileName + WAL_PREFIX_LEN); + int64_t fileId = atoll(fileName + WAL_PREFIX_LEN); if (fileId == pWal->fileId) continue; - wDebug("vgId:%d, wal:%s, will be restored", pWal->vgId, fileName); + char walName[WAL_FILE_LEN]; + snprintf(walName, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); - int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, fileName); + wDebug("vgId:%d, file:%s, will be restored", pWal->vgId, walName); + int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, walName); if (code != TSDB_CODE_SUCCESS) continue; - wInfo("vgId:%d, wal:%s, restore success, remove this file", pWal->vgId, fileName); - remove(fileName); + + if (!pWal->keep) { + wDebug("vgId:%d, file:%s, restore success, remove this file", pWal->vgId, walName); + remove(walName); + } else { + wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName); + } count++; } @@ -142,6 +150,7 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void * if (pWal->keep) { if (count == 0) { + wDebug("vgId:%d, file:%s not exist, renew it", pWal->vgId, pWal->name); return walRenew(pWal); } else { // open the existing WAL file in append mode @@ -149,9 +158,10 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void * snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); if (pWal->fd < 0) { - wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->name, strerror(errno)); + wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } + wDebug("vgId:%d, file:%s open success", pWal->vgId, pWal->name); } } @@ -176,18 +186,18 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch int32_t size = WAL_MAX_SIZE; void * buffer = tmalloc(size); if (buffer == NULL) { - wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); + wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } int32_t fd = open(name, O_RDWR); if (fd < 0) { - wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); + wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); tfree(buffer); return TAOS_SYSTEM_ERROR(errno); } - wDebug("vgId:%d, wal:%s, start to restore", pWal->vgId, name); + wDebug("vgId:%d, file:%s, start to restore", pWal->vgId, name); int32_t code = TSDB_CODE_SUCCESS; size_t offset = 0; @@ -198,13 +208,13 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch if (ret == 0) break; if (ret < 0) { - wError("vgId:%d, wal:%s, failed to read wal head part since %s", pWal->vgId, name, strerror(errno)); + wError("vgId:%d, file:%s, failed to read wal head part since %s", pWal->vgId, name, strerror(errno)); code = TAOS_SYSTEM_ERROR(errno); break; } if (ret < sizeof(SWalHead)) { - wError("vgId:%d, wal:%s, failed to read wal head since %s, read size:%d, skip the rest of file", pWal->vgId, + wError("vgId:%d, file:%s, failed to read wal head since %s, read size:%d, skip the rest of file", pWal->vgId, name, strerror(errno), ret); taosFtruncate(fd, offset); fsync(fd); @@ -212,7 +222,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch } if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { - wError("vgId:%d, wal:%s, wal head cksum is messed up, skip the rest of file", pWal->vgId, name); + wError("vgId:%d, file:%s, wal head cksum is messed up, skip the rest of file", pWal->vgId, name); code = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(false); break; @@ -222,7 +232,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch size = sizeof(SWalHead) + pHead->len; buffer = realloc(buffer, size); if (buffer == NULL) { - wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); + wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); code = TAOS_SYSTEM_ERROR(errno); break; } @@ -232,13 +242,13 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ret = taosTRead(fd, pHead->cont, pHead->len); if (ret < 0) { - wError("vgId:%d, wal:%s failed to read wal body part since %s", pWal->vgId, name, strerror(errno)); + wError("vgId:%d, file:%s failed to read wal body part since %s", pWal->vgId, name, strerror(errno)); code = TAOS_SYSTEM_ERROR(errno); break; } if (ret < pHead->len) { - wError("vgId:%d, wal:%s, failed to read body since %s, read size:%d len:%d , skip the rest of file", pWal->vgId, + wError("vgId:%d, file:%s, failed to read body since %s, read size:%d len:%d , skip the rest of file", pWal->vgId, name, strerror(errno), ret, pHead->len); taosFtruncate(fd, offset); fsync(fd); @@ -248,6 +258,9 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch offset = offset + sizeof(SWalHead) + pHead->len; if (pWal->keep) pWal->version = pHead->version; + + wTrace("vgId:%d, restore version:%" PRIu64 ", fileId:%" PRId64, pWal->vgId, pWal->version, pWal->fileId); + (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL); } @@ -279,7 +292,7 @@ static int32_t walGetNextFile(SWal *pWal, int64_t lastFileId, int64_t *nexFileId char *fileName = ent->d_name; if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { - uint64_t fileId = atoll(fileName + WAL_PREFIX_LEN); + int64_t fileId = atoll(fileName + WAL_PREFIX_LEN); if (fileId <= lastFileId) continue; if (fileId < nearFileId) { From 30c7c2c0f60080e031e7bd1a5764a15fbbd80f91 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 30 Oct 2020 19:42:21 +0800 Subject: [PATCH 04/12] TD-1856 --- src/inc/tsync.h | 2 +- src/inc/twal.h | 1 - src/mnode/src/mnodeSdb.c | 4 ++-- src/sync/test/syncServer.c | 2 +- src/vnode/src/vnodeMain.c | 6 +++--- src/wal/src/walWrite.c | 2 +- src/wal/test/waltest.c | 2 +- 7 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 11b81f9379..671adefab8 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -68,7 +68,7 @@ typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uin // get the wal file from index or after // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file -typedef int (*FGetWalInfo)(void *ahandle, char *name, int64_t *index); +typedef int32_t (*FGetWalInfo)(void *ahandle, char *fileName, int64_t *fileId); // when a forward pkt is received, call this to handle data typedef int (*FWriteToCache)(void *ahandle, void *pHead, int type); diff --git a/src/inc/twal.h b/src/inc/twal.h index b87831381d..94bdcacfce 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -41,7 +41,6 @@ typedef struct { int8_t walLevel; // wal level int8_t wals; // number of WAL files; int8_t keep; // keep the wal file when closed - int8_t reserved[5]; } SWalCfg; typedef void* twalh; // WAL HANDLE diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index a04c161599..e3c13505e4 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -237,8 +237,8 @@ static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint3 return 0; } -static int sdbGetWalInfo(void *ahandle, char *name, int64_t *index) { - return walGetWalFile(tsSdbObj.wal, name, index); +static int32_t sdbGetWalInfo(void *ahandle, char *fileName, int64_t *fileId) { + return walGetWalFile(tsSdbObj.wal, fileName, fileId); } static void sdbNotifyRole(void *ahandle, int8_t role) { diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c index 9ae45b25e3..0cf752da97 100644 --- a/src/sync/test/syncServer.c +++ b/src/sync/test/syncServer.c @@ -254,7 +254,7 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex return magic; } -int getWalInfo(void *ahandle, char *name, uint64_t *index) { +int getWalInfo(void *ahandle, char *name, int64_t *index) { struct stat fstat; char aname[280]; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 2d5ec98a80..40eeeb4ed7 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -42,7 +42,7 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode); static int32_t vnodeReadVersion(SVnodeObj *pVnode); static int vnodeProcessTsdbStatus(void *arg, int status); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); -static int vnodeGetWalInfo(void *ahandle, char *name, int64_t *index); +static int vnodeGetWalInfo(void *ahandle, char *fileName, int64_t *fileId); static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeCtrlFlow(void *handle, int32_t mseconds); static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); @@ -622,9 +622,9 @@ static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uin return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size); } -static int vnodeGetWalInfo(void *ahandle, char *name, int64_t *index) { +static int vnodeGetWalInfo(void *ahandle, char *fileName, int64_t *fileId) { SVnodeObj *pVnode = ahandle; - return walGetWalFile(pVnode->wal, name, index); + return walGetWalFile(pVnode->wal, fileName, fileId); } static void vnodeNotifyRole(void *ahandle, int8_t role) { diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 45f00def06..70bcc49593 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -58,7 +58,7 @@ int32_t walRenew(void *handle) { if (!pWal->keep && lastId != -1) { // remove last wal file - char name[TSDB_FILENAME_LEN + 20]; + char name[WAL_FILE_LEN]; snprintf(name, sizeof(name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, lastId); if (remove(name) < 0) { wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, name, strerror(errno)); diff --git a/src/wal/test/waltest.c b/src/wal/test/waltest.c index ba0011af29..186f2ef5ff 100644 --- a/src/wal/test/waltest.c +++ b/src/wal/test/waltest.c @@ -116,7 +116,7 @@ int main(int argc, char *argv[]) { printf("%d wal files are written\n", total); int64_t index = 0; - char name[256]; + char name[256]; while (1) { int code = walGetWalFile(pWal, name, &index); From 08c82ac39697844ac278279c04ba354f37da43b0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 30 Oct 2020 22:56:24 +0800 Subject: [PATCH 05/12] TD-1846 --- src/wal/inc/walInt.h | 4 ++ src/wal/src/walUtil.c | 85 ++++++++++++++++++++++++ src/wal/src/walWrite.c | 145 ++++++++++++++--------------------------- 3 files changed, 139 insertions(+), 95 deletions(-) create mode 100644 src/wal/src/walUtil.c diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index 8b5784d1ce..74a0184c28 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -53,6 +53,10 @@ typedef struct { pthread_mutex_t mutex; } SWal; +// util +int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId); +int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId); + #ifdef __cplusplus } #endif diff --git a/src/wal/src/walUtil.c b/src/wal/src/walUtil.c new file mode 100644 index 0000000000..b190c703bd --- /dev/null +++ b/src/wal/src/walUtil.c @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "walInt.h" + +int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) { + int64_t curFileId = *nextFileId; + int64_t nearFileId = INT64_MAX; + + DIR *dir = opendir(pWal->path); + if (dir == NULL) { + wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); + return -1; + } + + struct dirent *ent; + while ((ent = readdir(dir)) != NULL) { + char *name = ent->d_name; + + if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { + int64_t id = atoll(name + WAL_PREFIX_LEN); + if (id <= curFileId) continue; + + if (id < nearFileId) { + nearFileId = id; + } + } + } + closedir(dir); + + if (nearFileId == INT64_MAX) return -1; + + *nextFileId = nearFileId; + wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " nextFileId:%" PRId64, pWal->vgId, pWal->path, curFileId, *nextFileId); + + return 0; +} + +int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId) { + int64_t nearFileId = INT64_MAX; + + DIR *dir = opendir(pWal->path); + if (dir == NULL) { + wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); + return -1; + } + + struct dirent *ent; + while ((ent = readdir(dir)) != NULL) { + char *name = ent->d_name; + + if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { + int64_t id = atoll(name + WAL_PREFIX_LEN); + if (id >= curFileId) continue; + + minDiff--; + if (id < nearFileId) { + nearFileId = id; + } + } + } + closedir(dir); + + if (nearFileId == INT64_MAX) return -1; + if (minDiff > 0) return -1; + + *oldFileId = nearFileId; + wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " oldFildId:%" PRId64, pWal->vgId, pWal->path, curFileId, *oldFileId); + + return 0; +} \ No newline at end of file diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 70bcc49593..54efdc1ce7 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -18,13 +18,10 @@ #include "talloc.h" #include "taoserror.h" #include "tchecksum.h" -#include "tutil.h" -#include "tqueue.h" #include "twal.h" #include "walInt.h" static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name); -static int32_t walGetNextFile(SWal *pWal, int64_t lastFileId, int64_t *nexFileId, char *nextFileName); int32_t walRenew(void *handle) { if (handle == NULL) return 0; @@ -39,12 +36,7 @@ int32_t walRenew(void *handle) { wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name); } - int64_t lastId = pWal->fileId; - if (pWal->keep) { - pWal->fileId = 0; - } else { - pWal->fileId = taosGetTimestampUs(); - } + pWal->fileId = (pWal->keep ? 0 : taosGetTimestampUs()); snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); @@ -56,14 +48,18 @@ int32_t walRenew(void *handle) { wDebug("vgId:%d, file:%s, it is created", pWal->vgId, pWal->name); } - if (!pWal->keep && lastId != -1) { - // remove last wal file - char name[WAL_FILE_LEN]; - snprintf(name, sizeof(name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, lastId); - if (remove(name) < 0) { - wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, name, strerror(errno)); - } else { - wDebug("vgId:%d, file:%s, it is removed", pWal->vgId, name); + if (!pWal->keep) { + // remove the oldest wal file + int64_t oldFileId = -1; + if (walGetOldFile(pWal, pWal->fileId, 2, &oldFileId) == 0) { + char walName[WAL_FILE_LEN] = {0}; + snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId); + + if (remove(walName) < 0) { + wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno)); + } else { + wDebug("vgId:%d, file:%s, it is removed", pWal->vgId, walName); + } } } @@ -115,54 +111,47 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void * SWal * pWal = handle; int32_t count = 0; + int32_t code = 0; + int64_t fileId = -1; - DIR *dir = opendir(pWal->path); - if (dir == NULL && errno == ENOENT) return 0; - if (dir == NULL) return TAOS_SYSTEM_ERROR(errno); + while ((code = walGetNextFile(pWal, &fileId)) >= 0) { + if (fileId == pWal->fileId) continue; - struct dirent *ent; - while ((ent = readdir(dir)) != NULL) { - char *fileName = ent->d_name; + char walName[WAL_FILE_LEN]; + snprintf(walName, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); - if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { - int64_t fileId = atoll(fileName + WAL_PREFIX_LEN); - if (fileId == pWal->fileId) continue; - - char walName[WAL_FILE_LEN]; - snprintf(walName, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); - - wDebug("vgId:%d, file:%s, will be restored", pWal->vgId, walName); - int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, walName); - if (code != TSDB_CODE_SUCCESS) continue; - - - if (!pWal->keep) { - wDebug("vgId:%d, file:%s, restore success, remove this file", pWal->vgId, walName); - remove(walName); - } else { - wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName); - } - - count++; + wDebug("vgId:%d, file:%s, will be restored", pWal->vgId, walName); + int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, walName); + if (code != TSDB_CODE_SUCCESS) { + wDebug("vgId:%d, file:%s, failed to restore since %s", pWal->vgId, walName, tstrerror(code)); + continue; } - } - closedir(dir); - if (pWal->keep) { - if (count == 0) { - wDebug("vgId:%d, file:%s not exist, renew it", pWal->vgId, pWal->name); - return walRenew(pWal); + if (!pWal->keep) { + wDebug("vgId:%d, file:%s, restore success, remove this file", pWal->vgId, walName); + remove(walName); } else { - // open the existing WAL file in append mode - pWal->fileId = 0; - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); - pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); - if (pWal->fd < 0) { - wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); - } - wDebug("vgId:%d, file:%s open success", pWal->vgId, pWal->name); + wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName); } + + count++; + } + + if (!pWal->keep) return TSDB_CODE_SUCCESS; + + if (count == 0) { + wDebug("vgId:%d, file:%s not exist, renew it", pWal->vgId, pWal->name); + return walRenew(pWal); + } else { + // open the existing WAL file in append mode + pWal->fileId = 0; + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); + pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); + if (pWal->fd < 0) { + wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); + return TAOS_SYSTEM_ERROR(errno); + } + wDebug("vgId:%d, file:%s open success", pWal->vgId, pWal->name); } return TSDB_CODE_SUCCESS; @@ -173,8 +162,9 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { SWal *pWal = handle; pthread_mutex_lock(&(pWal->mutex)); - int32_t code = walGetNextFile(pWal, *fileId, fileId, fileName); - if (code == 0) { + int32_t code = walGetNextFile(pWal, fileId); + if (code >= 0) { + sprintf(fileName, "wal/%s%" PRId64, WAL_PREFIX, *fileId); code = (*fileId == pWal->fileId) ? 0 : 1; } pthread_mutex_unlock(&(pWal->mutex)); @@ -276,38 +266,3 @@ int64_t walGetVersion(twalh param) { return pWal->version; } - -static int32_t walGetNextFile(SWal *pWal, int64_t lastFileId, int64_t *nexFileId, char *nextFileName) { - int64_t nearFileId = INT64_MAX; - char nearFileName[WAL_FILE_LEN] = {0}; - - DIR *dir = opendir(pWal->path); - if (dir == NULL) { - wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); - return -1; - } - - struct dirent *ent; - while ((ent = readdir(dir)) != NULL) { - char *fileName = ent->d_name; - - if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { - int64_t fileId = atoll(fileName + WAL_PREFIX_LEN); - if (fileId <= lastFileId) continue; - - if (fileId < nearFileId) { - nearFileId = fileId; - tstrncpy(nearFileName, fileName, WAL_FILE_LEN); - } - } - } - closedir(dir); - - if (nearFileId == INT64_MAX) return -1; - - *nexFileId = nearFileId; - tstrncpy(nextFileName, nearFileName, WAL_FILE_LEN); - wTrace("vgId:%d, path:%s, lastfile %" PRId64 ", nextfile is %s", pWal->vgId, pWal->path, lastFileId, nextFileName); - - return 0; -} From 89789920fe81c632532dfc6c12793b2fe2c3da97 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 30 Oct 2020 23:38:07 +0800 Subject: [PATCH 06/12] TD-1846 --- src/wal/inc/walInt.h | 4 ++-- src/wal/src/walMgmt.c | 9 +++------ src/wal/src/walUtil.c | 2 +- src/wal/src/walWrite.c | 8 +++++++- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index 74a0184c28..319b652dc7 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -38,22 +38,22 @@ extern int32_t wDebugFlag; #define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE)) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_FILE_LEN (TSDB_FILENAME_LEN + 32) +#define WAL_FILE_NUM 3 typedef struct { uint64_t version; + int64_t fileId; int32_t vgId; int32_t fd; int32_t keep; int32_t level; int32_t fsyncPeriod; int32_t fsyncSeq; - int64_t fileId; char path[WAL_PATH_LEN]; char name[WAL_FILE_LEN]; pthread_mutex_t mutex; } SWal; -// util int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId); int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId); diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index 72c0ef21a2..52a8d9dbf0 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -18,7 +18,6 @@ #include "taoserror.h" #include "talloc.h" #include "tref.h" -#include "tutil.h" #include "twal.h" #include "walInt.h" @@ -135,9 +134,8 @@ void walClose(void *handle) { static int32_t walInitObj(SWal *pWal) { if (taosMkDir(pWal->path, 0755) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno)); - return terrno; + return TAOS_SYSTEM_ERROR(errno); } if (pWal->keep) { @@ -147,9 +145,8 @@ static int32_t walInitObj(SWal *pWal) { walRenew(pWal); if (pWal && pWal->fd < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%s, failed to open file since %s", pWal->vgId, pWal->path, strerror(errno)); - return terrno; + return TAOS_SYSTEM_ERROR(errno); } wDebug("vgId:%d, file is initialized", pWal->vgId); @@ -191,7 +188,7 @@ static void walFsyncAll() { wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); int32_t code = fsync(pWal->fd); if (code != 0) { - wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(code)); + wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code)); } } pWal = taosIterateRef(tsWal.refId, pWal); diff --git a/src/wal/src/walUtil.c b/src/wal/src/walUtil.c index b190c703bd..c4ef6df195 100644 --- a/src/wal/src/walUtil.c +++ b/src/wal/src/walUtil.c @@ -82,4 +82,4 @@ int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *o wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " oldFildId:%" PRId64, pWal->vgId, pWal->path, curFileId, *oldFileId); return 0; -} \ No newline at end of file +} diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 54efdc1ce7..39a7edb240 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -51,7 +51,7 @@ int32_t walRenew(void *handle) { if (!pWal->keep) { // remove the oldest wal file int64_t oldFileId = -1; - if (walGetOldFile(pWal, pWal->fileId, 2, &oldFileId) == 0) { + if (walGetOldFile(pWal, pWal->fileId, WAL_FILE_NUM, &oldFileId) == 0) { char walName[WAL_FILE_LEN] = {0}; snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId); @@ -161,12 +161,18 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { if (handle == NULL) return -1; SWal *pWal = handle; + // for keep + if (*fileId == 0) *fileId = -1; + pthread_mutex_lock(&(pWal->mutex)); + int32_t code = walGetNextFile(pWal, fileId); if (code >= 0) { sprintf(fileName, "wal/%s%" PRId64, WAL_PREFIX, *fileId); code = (*fileId == pWal->fileId) ? 0 : 1; } + + wTrace("vgId:%d, get wal file, code:%d curId:%" PRId64 " outId:%" PRId64, pWal->vgId, code, pWal->fileId, *fileId); pthread_mutex_unlock(&(pWal->mutex)); return code; From b77785cf83cd5cb7ede1bcf1fd9e23d46e70cb69 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 31 Oct 2020 11:29:29 +0800 Subject: [PATCH 07/12] scripts --- tests/script/unique/dnode/offline1.sim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/unique/dnode/offline1.sim b/tests/script/unique/dnode/offline1.sim index 02d03dee97..beebbfda60 100644 --- a/tests/script/unique/dnode/offline1.sim +++ b/tests/script/unique/dnode/offline1.sim @@ -49,7 +49,7 @@ print dnode1 $data4_2 if $data4_1 != ready then return -1 endi -if $data4_2 != offline then +if $data4_2 == ready then return -1 endi From 395bdd2c977d44335a14fad843c17b2008a2c778 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 31 Oct 2020 11:46:18 +0800 Subject: [PATCH 08/12] compile error in windows --- src/util/inc/talloc.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/util/inc/talloc.h b/src/util/inc/talloc.h index a59e88cf13..1fc4d759b0 100644 --- a/src/util/inc/talloc.h +++ b/src/util/inc/talloc.h @@ -20,6 +20,8 @@ extern "C" { #endif +#define TSDB_USE_SYS_MEM + #ifdef TSDB_USE_SYS_MEM #define tmalloc(size) malloc(size) #define tcalloc(size) calloc(1, size) From 249e599bd1e05795f2547e1de49ced16142e9ec1 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 31 Oct 2020 11:56:37 +0800 Subject: [PATCH 09/12] compile error in windows --- src/util/src/talloc.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/util/src/talloc.c b/src/util/src/talloc.c index 6f23b39393..d3d8ee8116 100644 --- a/src/util/src/talloc.c +++ b/src/util/src/talloc.c @@ -17,8 +17,10 @@ #include "os.h" #include "taoserror.h" #include "tulog.h" +#include "talloc.h" #define TSDB_HAVE_MEMALIGN +#ifndef TSDB_USE_SYS_MEM void *tmalloc(int32_t size) { void *p = malloc(size); @@ -74,3 +76,4 @@ void *tmemalign(int32_t alignment, int32_t size) { void *tmemalign(int32_t alignment, int32_t size) { return tmalloc(size); } #endif +#endif \ No newline at end of file From d103af8ff825fcb001a35922bf42c7a3c27ad457 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 31 Oct 2020 16:14:27 +0800 Subject: [PATCH 10/12] TD-1872 according to the suggestion of review --- src/wal/inc/walInt.h | 3 ++- src/wal/src/walMgmt.c | 21 +++++++++++------ src/wal/src/walUtil.c | 53 ++++++++++++++++++++++++++++++++++-------- src/wal/src/walWrite.c | 11 ++++++++- 4 files changed, 69 insertions(+), 19 deletions(-) diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index 319b652dc7..511f8320f0 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -32,7 +32,7 @@ extern int32_t wDebugFlag; #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define WAL_PREFIX "wal" -#define WAL_PREFIX_LEN 3 +#define WAL_PREFIX_LEN 2 #define WAL_REFRESH_MS 1000 #define WAL_MAX_SIZE (1024 * 1024) #define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE)) @@ -56,6 +56,7 @@ typedef struct { int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId); int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId); +int32_t walGetNewFile(SWal *pWal, int64_t *newFileId); #ifdef __cplusplus } diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index 52a8d9dbf0..15f74370ba 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -23,7 +23,6 @@ typedef struct { int32_t refId; - int32_t num; int32_t seq; int8_t stop; pthread_t thread; @@ -85,7 +84,6 @@ void *walOpen(char *path, SWalCfg *pCfg) { return NULL; } - atomic_add_fetch_32(&tsWal.num, 1); wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod); return pWal; @@ -116,19 +114,28 @@ void walClose(void *handle) { if (handle == NULL) return; SWal *pWal = handle; + pthread_mutex_lock(&pWal->mutex); + taosClose(pWal->fd); if (!pWal->keep) { - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); - if (remove(pWal->name) < 0) { - wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name); - } else { - wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name); + int64_t fileId = -1; + while (walGetNextFile(pWal, &fileId) >= 0) { + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); + + if (fileId == pWal->fileId) { + wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name); + } else if (remove(pWal->name) < 0) { + wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name); + } else { + wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name); + } } } else { wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name); } + pthread_mutex_unlock(&pWal->mutex); taosRemoveRef(tsWal.refId, pWal); } diff --git a/src/wal/src/walUtil.c b/src/wal/src/walUtil.c index c4ef6df195..7d79a39f85 100644 --- a/src/wal/src/walUtil.c +++ b/src/wal/src/walUtil.c @@ -19,7 +19,7 @@ int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) { int64_t curFileId = *nextFileId; - int64_t nearFileId = INT64_MAX; + int64_t minFileId = INT64_MAX; DIR *dir = opendir(pWal->path); if (dir == NULL) { @@ -35,23 +35,23 @@ int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) { int64_t id = atoll(name + WAL_PREFIX_LEN); if (id <= curFileId) continue; - if (id < nearFileId) { - nearFileId = id; + if (id < minFileId) { + minFileId = id; } } } closedir(dir); - if (nearFileId == INT64_MAX) return -1; + if (minFileId == INT64_MAX) return -1; - *nextFileId = nearFileId; + *nextFileId = minFileId; wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " nextFileId:%" PRId64, pWal->vgId, pWal->path, curFileId, *nextFileId); return 0; } int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId) { - int64_t nearFileId = INT64_MAX; + int64_t minFileId = INT64_MAX; DIR *dir = opendir(pWal->path); if (dir == NULL) { @@ -68,18 +68,51 @@ int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *o if (id >= curFileId) continue; minDiff--; - if (id < nearFileId) { - nearFileId = id; + if (id < minFileId) { + minFileId = id; } } } closedir(dir); - if (nearFileId == INT64_MAX) return -1; + if (minFileId == INT64_MAX) return -1; if (minDiff > 0) return -1; - *oldFileId = nearFileId; + *oldFileId = minFileId; wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " oldFildId:%" PRId64, pWal->vgId, pWal->path, curFileId, *oldFileId); return 0; } + +int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) { + int64_t maxFileId = INT64_MIN; + + DIR *dir = opendir(pWal->path); + if (dir == NULL) { + wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); + return -1; + } + + struct dirent *ent; + while ((ent = readdir(dir)) != NULL) { + char *name = ent->d_name; + + if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { + int64_t id = atoll(name + WAL_PREFIX_LEN); + if (id > maxFileId) { + maxFileId = id; + } + } + } + closedir(dir); + + if (maxFileId == INT64_MAX) { + *newFileId = 0; + } else { + *newFileId = maxFileId; + } + + wTrace("vgId:%d, path:%s, newFileId:%" PRId64, pWal->vgId, pWal->path, *newFileId); + + return 0; +} \ No newline at end of file diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 39a7edb240..68ea5e2b72 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -36,7 +36,12 @@ int32_t walRenew(void *handle) { wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name); } - pWal->fileId = (pWal->keep ? 0 : taosGetTimestampUs()); + if (pWal->keep) { + pWal->fileId = 0; + } else { + if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0; + pWal->fileId++; + } snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); @@ -82,6 +87,8 @@ int32_t walWrite(void *handle, SWalHead *pHead) { taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); int32_t contLen = pHead->len + sizeof(SWalHead); + pthread_mutex_lock(&pWal->mutex); + if (taosTWrite(pWal->fd, pHead, contLen) != contLen) { code = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno)); @@ -90,6 +97,8 @@ int32_t walWrite(void *handle, SWalHead *pHead) { wTrace("vgId:%d, write version:%" PRId64 ", fileId:%" PRId64, pWal->vgId, pWal->version, pWal->fileId); } + pthread_mutex_unlock(&pWal->mutex); + ASSERT(contLen == pHead->len + sizeof(SWalHead)); return code; From c89fd5ae0e728d71466c583f899f0b75ec39471d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 31 Oct 2020 18:28:58 +0800 Subject: [PATCH 11/12] TD-1872 --- src/wal/src/walUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/wal/src/walUtil.c b/src/wal/src/walUtil.c index 7d79a39f85..e4d9a555b3 100644 --- a/src/wal/src/walUtil.c +++ b/src/wal/src/walUtil.c @@ -106,7 +106,7 @@ int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) { } closedir(dir); - if (maxFileId == INT64_MAX) { + if (maxFileId == INT64_MIN) { *newFileId = 0; } else { *newFileId = maxFileId; From 65a371959296ec04c77549d72ce6fb2ae582bc89 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 31 Oct 2020 22:48:10 +0800 Subject: [PATCH 12/12] TD-1872 --- src/wal/inc/walInt.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index 511f8320f0..7e731d44db 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -32,7 +32,7 @@ extern int32_t wDebugFlag; #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define WAL_PREFIX "wal" -#define WAL_PREFIX_LEN 2 +#define WAL_PREFIX_LEN 3 #define WAL_REFRESH_MS 1000 #define WAL_MAX_SIZE (1024 * 1024) #define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))