first draft for WAL
This commit is contained in:
parent
92fa341fc5
commit
87bf983d48
|
@ -1,4 +1,15 @@
|
|||
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||
PROJECT(TDengine)
|
||||
|
||||
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||
INCLUDE_DIRECTORIES(inc)
|
||||
|
||||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
|
||||
|
||||
ADD_LIBRARY(wal ${SRC})
|
||||
TARGET_INCLUDE_DIRECTORIES(wal PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/inc)
|
||||
ADD_LIBRARY(twal ${SRC})
|
||||
TARGET_LINK_LIBRARIES(twal tutil)
|
||||
|
||||
ADD_SUBDIRECTORY(test)
|
||||
|
||||
|
|
|
@ -14,19 +14,36 @@
|
|||
*/
|
||||
#ifndef _TD_WAL_H_
|
||||
#define _TD_WAL_H_
|
||||
#include <stdint.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef void walh; // WAL HANDLE
|
||||
#define TAOS_WAL_NOLOG 0
|
||||
#define TAOS_WAL_WRITE 1
|
||||
#define TAOS_WAL_FSYNC 2
|
||||
|
||||
typedef struct {
|
||||
uint32_t signature;
|
||||
uint32_t cksum;
|
||||
int8_t msgType;
|
||||
int8_t reserved[3];
|
||||
int32_t len;
|
||||
uint64_t version;
|
||||
char cont[];
|
||||
} SWalHead;
|
||||
|
||||
typedef void* twal_h; // WAL HANDLE
|
||||
|
||||
twal_h walOpen(char *path, int max, int level);
|
||||
void walClose(twal_h);
|
||||
int walRenew(twal_h);
|
||||
int walWrite(twal_h, SWalHead *);
|
||||
void walFsync(twal_h);
|
||||
int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pWalHead));
|
||||
|
||||
extern int wDebugFlag;
|
||||
|
||||
walh *vnodeOpenWal(int vnode, uint8_t op);
|
||||
int vnodeCloseWal(walh *pWal);
|
||||
int vnodeRenewWal(walh *pWal);
|
||||
int vnodeWriteWal(walh *pWal, void *cont, int contLen);
|
||||
int vnodeSyncWal(walh *pWal);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#include <stdlib.h>
|
||||
|
||||
#include "vnodeWal.h"
|
||||
|
||||
typedef struct {
|
||||
/* TODO */
|
||||
} SWal;
|
||||
|
||||
walh *vnodeOpenWal(int vnode, uint8_t op) { return NULL; }
|
||||
int vnodeCloseWal(walh *pWal) { return 0; }
|
||||
int vnodeRenewWal(walh *pWal) { return 0; }
|
||||
int vnodeWriteWal(walh *pWal, void *cont, int contLen) { return 0; }
|
||||
int vnodeSyncWal(walh *pWal) { return 0; }
|
|
@ -0,0 +1,316 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <sys/types.h>
|
||||
#include <dirent.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
#include "os.h"
|
||||
#include "tlog.h"
|
||||
#include "tutil.h"
|
||||
#include "twal.h"
|
||||
|
||||
#define walPrefix "wal"
|
||||
#define wError(...) if (wDebugFlag & DEBUG_ERROR) {tprintf("ERROR WAL ", wDebugFlag, __VA_ARGS__);}
|
||||
#define wWarn(...) if (wDebugFlag & DEBUG_WARN) {tprintf("WARN WAL ", wDebugFlag, __VA_ARGS__);}
|
||||
#define wTrace(...) if (wDebugFlag & DEBUG_TRACE) {tprintf("WAL ", wDebugFlag, __VA_ARGS__);}
|
||||
#define wPrint(...) {tprintf("WAL ", 255, __VA_ARGS__);}
|
||||
|
||||
typedef struct {
|
||||
int fd;
|
||||
int level;
|
||||
int max; // maximum number of wal files
|
||||
uint32_t id; // increase continuously
|
||||
int num; // number of wal files
|
||||
char path[TSDB_FILENAME_LEN];
|
||||
char name[TSDB_FILENAME_LEN];
|
||||
} SWal;
|
||||
|
||||
int wDebugFlag = 135;
|
||||
|
||||
static uint32_t walSignature = 0xFAFBFDFE;
|
||||
static int walHandleExistingFiles(char *path);
|
||||
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *));
|
||||
static int walRemoveWalFiles(char *path);
|
||||
|
||||
void *walOpen(char *path, int max, int level) {
|
||||
SWal *pWal = calloc(sizeof(SWal), 1);
|
||||
if (pWal == NULL) return NULL;
|
||||
|
||||
pWal->fd = -1;
|
||||
pWal->max = max;
|
||||
pWal->id = 0;
|
||||
pWal->num = 0;
|
||||
pWal->level = level;
|
||||
strcpy(pWal->path, path);
|
||||
|
||||
if (access(path, F_OK) != 0) mkdir(path, 0755);
|
||||
|
||||
if (walHandleExistingFiles(path) == 0)
|
||||
walRenew(pWal);
|
||||
|
||||
if (pWal->fd <0) {
|
||||
wError("wal:%s, failed to open", path);
|
||||
free(pWal);
|
||||
pWal = NULL;
|
||||
}
|
||||
|
||||
return pWal;
|
||||
}
|
||||
|
||||
void walClose(void *handle) {
|
||||
|
||||
SWal *pWal = (SWal *)handle;
|
||||
|
||||
close(pWal->fd);
|
||||
|
||||
// remove all files in the directory
|
||||
for (int i=0; i<pWal->num; ++i) {
|
||||
sprintf(pWal->name, "%s/%s%010d", pWal->path, walPrefix, pWal->id-i);
|
||||
if (remove(pWal->name) <0) {
|
||||
wError("wal:%s, failed to remove", pWal->name);
|
||||
} else {
|
||||
wTrace("wal:%s, it is removed", pWal->name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int walRenew(twal_h handle) {
|
||||
SWal *pWal = (SWal *)handle;
|
||||
|
||||
if (pWal->fd >=0) {
|
||||
close(pWal->fd);
|
||||
pWal->id++;
|
||||
wTrace("wal:%s, it is closed", pWal->name);
|
||||
}
|
||||
|
||||
sprintf(pWal->name, "%s/%s%010d", pWal->path, walPrefix, pWal->id);
|
||||
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
if (pWal->fd < 0) {
|
||||
wError("wal:%d, failed to open(%s)", pWal->name, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
wTrace("wal:%s, it is open", pWal->name);
|
||||
|
||||
pWal->num++;
|
||||
if (pWal->num > pWal->max) {
|
||||
// remove the oldest wal file
|
||||
char name[TSDB_FILENAME_LEN];
|
||||
sprintf(name, "%s/%s%010d", pWal->path, walPrefix, pWal->id - pWal->max);
|
||||
if (remove(name) <0) {
|
||||
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
|
||||
} else {
|
||||
wTrace("wal:%s, it is removed", name);
|
||||
}
|
||||
|
||||
pWal->num--;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int walWrite(void *handle, SWalHead *pHead) {
|
||||
SWal *pWal = (SWal *)handle;
|
||||
int code = 0;
|
||||
|
||||
// no wal
|
||||
if (pWal->level == TAOS_WAL_NOLOG) return 0;
|
||||
|
||||
pHead->signature = walSignature;
|
||||
int contLen = pHead->len + sizeof(SWalHead);
|
||||
|
||||
if(write(pWal->fd, pHead, contLen) != contLen) {
|
||||
wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
|
||||
code = -1;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void walFsync(void *handle) {
|
||||
|
||||
SWal *pWal = (SWal *)handle;
|
||||
|
||||
if (pWal->level == TAOS_WAL_FSYNC)
|
||||
fsync(pWal->fd);
|
||||
}
|
||||
|
||||
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) {
|
||||
SWal *pWal = (SWal *)handle;
|
||||
int code = 0;
|
||||
struct dirent *ent;
|
||||
int count = 0;
|
||||
uint32_t maxId = 0, minId = -1, index =0;
|
||||
|
||||
int plen = strlen(walPrefix);
|
||||
char opath[TSDB_FILENAME_LEN];
|
||||
sprintf(opath, "%s/old", pWal->path);
|
||||
|
||||
// is there old directory?
|
||||
if (access(opath, F_OK)) return 0;
|
||||
|
||||
DIR *dir = opendir(opath);
|
||||
while ((ent = readdir(dir))!= NULL) {
|
||||
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
|
||||
index = atol(ent->d_name + plen);
|
||||
if (index > maxId) maxId = index;
|
||||
if (index < minId) minId = index;
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
if ( count != (maxId-minId+1) ) {
|
||||
wError("wal:%s, messed up, count:%d max:%ld min:%ld", opath, count, maxId, minId);
|
||||
code = -1;
|
||||
} else {
|
||||
wTrace("wal:%s, %d files will be restored", opath, count);
|
||||
|
||||
for (index = minId; index<=maxId; ++index) {
|
||||
sprintf(pWal->name, "%s/old/%s%010d", pWal->path, walPrefix, index);
|
||||
code = walRestoreWalFile(pWal->name, pVnode, writeFp);
|
||||
if (code < 0) break;
|
||||
}
|
||||
}
|
||||
|
||||
if (code == 0) {
|
||||
code = walRemoveWalFiles(opath);
|
||||
if (code == 0) {
|
||||
if (remove(opath) < 0) {
|
||||
wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno));
|
||||
code = -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
closedir(dir);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)) {
|
||||
SWalHead walHead;
|
||||
int code = -1;
|
||||
|
||||
int fd = open(name, O_RDONLY);
|
||||
if (fd < 0) {
|
||||
wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
wTrace("wal:%s, start to restore", name);
|
||||
|
||||
while (1) {
|
||||
int ret = read(fd, &walHead, sizeof(walHead));
|
||||
if ( ret == 0) { code = 0; break;}
|
||||
|
||||
if (ret != sizeof(walHead)) {
|
||||
wError("wal:%s, failed to read(%s)", name, strerror(errno));
|
||||
break;
|
||||
}
|
||||
|
||||
if (walHead.signature != walSignature) {
|
||||
wError("wal:%s, file is messed up, signature:", name, walHead.signature);
|
||||
break;
|
||||
}
|
||||
|
||||
char *buffer = malloc(sizeof(SWalHead) + walHead.len);
|
||||
memcpy(buffer, &walHead, sizeof(walHead));
|
||||
|
||||
ret = read(fd, buffer+sizeof(walHead), walHead.len);
|
||||
if ( ret != walHead.len) {
|
||||
wError("wal:%s, failed to read(%s)", name, strerror(errno));
|
||||
break;
|
||||
}
|
||||
|
||||
// write into queue
|
||||
(*writeFp)(pVnode, buffer);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int walHandleExistingFiles(char *path) {
|
||||
int code = 0;
|
||||
char oname[TSDB_FILENAME_LEN];
|
||||
char nname[TSDB_FILENAME_LEN];
|
||||
char opath[TSDB_FILENAME_LEN];
|
||||
|
||||
sprintf(opath, "%s/old", path);
|
||||
|
||||
struct dirent *ent;
|
||||
DIR *dir = opendir(path);
|
||||
int plen = strlen(walPrefix);
|
||||
|
||||
if (access(opath, F_OK) == 0) {
|
||||
// old directory is there, it means restore process is not finished
|
||||
walRemoveWalFiles(path);
|
||||
|
||||
} else {
|
||||
// move all files to old directory
|
||||
int count = 0;
|
||||
while ((ent = readdir(dir))!= NULL) {
|
||||
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
|
||||
if (access(opath, F_OK) != 0) mkdir(opath, 0755);
|
||||
|
||||
sprintf(oname, "%s/%s", path, ent->d_name);
|
||||
sprintf(nname, "%s/old/%s", path, ent->d_name);
|
||||
if (rename(oname, nname) < 0) {
|
||||
wError("wal:%s, failed to move to new:%s", oname, nname);
|
||||
code = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
wTrace("wal:%s, %d files are moved for restoration", path, count);
|
||||
}
|
||||
|
||||
closedir(dir);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int walRemoveWalFiles(char *path) {
|
||||
int plen = strlen(walPrefix);
|
||||
char name[TSDB_FILENAME_LEN];
|
||||
int code = 0;
|
||||
|
||||
if (access(path, F_OK) != 0) return 0;
|
||||
|
||||
struct dirent *ent;
|
||||
DIR *dir = opendir(path);
|
||||
|
||||
while ((ent = readdir(dir))!= NULL) {
|
||||
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
|
||||
sprintf(name, "%s/%s", path, ent->d_name);
|
||||
if (remove(name) <0) {
|
||||
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
|
||||
code = -1; break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
closedir(dir);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||
PROJECT(TDengine)
|
||||
|
||||
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||
INCLUDE_DIRECTORIES(../inc)
|
||||
|
||||
LIST(APPEND WALTEST_SRC ./waltest.c)
|
||||
ADD_EXECUTABLE(waltest ${WALTEST_SRC})
|
||||
TARGET_LINK_LIBRARIES(waltest twal)
|
||||
|
||||
ENDIF ()
|
||||
|
||||
|
|
@ -0,0 +1,112 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
//#define _DEFAULT_SOURCE
|
||||
#include <stdint.h>
|
||||
#include "tlog.h"
|
||||
#include "twal.h"
|
||||
|
||||
int64_t ver = 0;
|
||||
void *pWal = NULL;
|
||||
|
||||
int writeToQueue(void *pVnode, void *data) {
|
||||
SWalHead *pHead = (SWalHead *)data;
|
||||
|
||||
// do nothing
|
||||
if (pHead->version > ver)
|
||||
ver = pHead->version;
|
||||
|
||||
walWrite(pWal, pHead);
|
||||
|
||||
free(data);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
char path[128] = "/home/jhtao/test/wal";
|
||||
int max = 3;
|
||||
int level = 2;
|
||||
int total = 5;
|
||||
int rows = 10000;
|
||||
int size = 128;
|
||||
|
||||
for (int i=1; i<argc; ++i) {
|
||||
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
||||
strcpy(path, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
|
||||
max = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
|
||||
level = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-r")==0 && i < argc-1) {
|
||||
rows = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
|
||||
total = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
|
||||
size = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-v")==0 && i < argc-1) {
|
||||
ver = atoll(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
|
||||
ddebugFlag = atoi(argv[++i]);
|
||||
} else {
|
||||
printf("\nusage: %s [options] \n", argv[0]);
|
||||
printf(" [-p path]: wal file path default is:%s\n", path);
|
||||
printf(" [-m max]: max wal files, default is:%d\n", max);
|
||||
printf(" [-l level]: log level, default is:%d\n", level);
|
||||
printf(" [-t total]: total wal files, default is:%d\n", total);
|
||||
printf(" [-r rows]: rows of records per wal file, default is:%d\n", rows);
|
||||
printf(" [-v version]: initial version, default is:%ld\n", ver);
|
||||
printf(" [-d debugFlag]: debug flag, default:%d\n", ddebugFlag);
|
||||
printf(" [-h help]: print out this help\n\n");
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
taosInitLog("wal.log", 100000, 10);
|
||||
|
||||
pWal = walOpen(path, max, level);
|
||||
if (pWal == NULL) {
|
||||
printf("failed to open wal\n");
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
int ret = walRestore(pWal, NULL, writeToQueue);
|
||||
if (ret <0) {
|
||||
printf("failed to restore wal\n");
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
printf("version starts from:%ld\n", ver);
|
||||
|
||||
int contLen = sizeof(SWalHead) + size;
|
||||
SWalHead *pHead = (SWalHead *) malloc(contLen);
|
||||
|
||||
for (int i=0; i<total; ++i) {
|
||||
for (int k=0; k<rows; ++k) {
|
||||
pHead->version = ++ver;
|
||||
walWrite(pWal, pHead);
|
||||
}
|
||||
|
||||
printf("renew a wal, i:%d\n", i);
|
||||
walRenew(pWal);
|
||||
}
|
||||
|
||||
printf("%d wal files are written\n", total);
|
||||
getchar();
|
||||
|
||||
walClose(pWal);
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue