Merge pull request #1643 from taosdata/enhance/sdbWal
optimize the support for SDB
This commit is contained in:
commit
0ca5397db8
|
@ -35,6 +35,7 @@
|
|||
#define wPrint(...) {tprintf("WAL ", 255, __VA_ARGS__);}
|
||||
|
||||
typedef struct {
|
||||
uint64_t version;
|
||||
int fd;
|
||||
int keep;
|
||||
int level;
|
||||
|
@ -50,9 +51,8 @@ int wDebugFlag = 135;
|
|||
|
||||
static uint32_t walSignature = 0xFAFBFDFE;
|
||||
static int walHandleExistingFiles(const char *path);
|
||||
static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp);
|
||||
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp);
|
||||
static int walRemoveWalFiles(const char *path);
|
||||
static int walMoveOldWalFilesBack(const char *path);
|
||||
|
||||
void *walOpen(const char *path, const SWalCfg *pCfg) {
|
||||
SWal *pWal = calloc(sizeof(SWal), 1);
|
||||
|
@ -69,6 +69,8 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
|
|||
|
||||
if (access(path, F_OK) != 0) mkdir(path, 0755);
|
||||
|
||||
if (pCfg->keep == 1) return pWal;
|
||||
|
||||
if (walHandleExistingFiles(path) == 0)
|
||||
walRenew(pWal);
|
||||
|
||||
|
@ -155,6 +157,7 @@ int walWrite(void *handle, SWalHead *pHead) {
|
|||
|
||||
// no wal
|
||||
if (pWal->level == TAOS_WAL_NOLOG) return 0;
|
||||
if (pHead->version <= pWal->version) return 0;
|
||||
|
||||
pHead->signature = walSignature;
|
||||
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
|
||||
|
@ -163,7 +166,9 @@ int walWrite(void *handle, SWalHead *pHead) {
|
|||
if(write(pWal->fd, pHead, contLen) != contLen) {
|
||||
wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
|
||||
code = -1;
|
||||
}
|
||||
} else {
|
||||
pWal->version = pHead->version;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -185,7 +190,10 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
|
|||
|
||||
int plen = strlen(walPrefix);
|
||||
char opath[TSDB_FILENAME_LEN+5];
|
||||
sprintf(opath, "%s/old", pWal->path);
|
||||
|
||||
int slen = sprintf(opath, "%s", pWal->path);
|
||||
if ( pWal->keep == 0)
|
||||
strcpy(opath+slen, "/old");
|
||||
|
||||
// is there old directory?
|
||||
if (access(opath, F_OK)) return 0;
|
||||
|
@ -200,6 +208,8 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
|
|||
}
|
||||
}
|
||||
|
||||
if (count == 0) return 0;
|
||||
|
||||
if ( count != (maxId-minId+1) ) {
|
||||
wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId);
|
||||
code = -1;
|
||||
|
@ -207,21 +217,29 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
|
|||
wTrace("wal:%s, %d files will be restored", opath, count);
|
||||
|
||||
for (index = minId; index<=maxId; ++index) {
|
||||
sprintf(pWal->name, "%s/old/%s%d", pWal->path, walPrefix, index);
|
||||
code = walRestoreWalFile(pWal->name, pVnode, writeFp);
|
||||
sprintf(pWal->name, "%s/%s%d", opath, walPrefix, index);
|
||||
code = walRestoreWalFile(pWal, pVnode, writeFp);
|
||||
if (code < 0) break;
|
||||
}
|
||||
}
|
||||
|
||||
if (code == 0) {
|
||||
if (pWal->keep) {
|
||||
code = walMoveOldWalFilesBack(pWal->path);
|
||||
} else {
|
||||
if (pWal->keep == 0) {
|
||||
code = walRemoveWalFiles(opath);
|
||||
}
|
||||
if (code == 0) {
|
||||
if (remove(opath) < 0) {
|
||||
wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno));
|
||||
if (code == 0) {
|
||||
if (remove(opath) < 0) {
|
||||
wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno));
|
||||
code = -1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// open the existing WAL file in append mode
|
||||
pWal->num = count;
|
||||
pWal->id = maxId;
|
||||
sprintf(pWal->name, "%s/%s%d", opath, walPrefix, maxId);
|
||||
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
if (pWal->fd < 0) {
|
||||
wError("wal:%s, failed to open file(%s)", pWal->name, strerror(errno));
|
||||
code = -1;
|
||||
}
|
||||
}
|
||||
|
@ -257,8 +275,9 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp) {
|
||||
int code = 0;
|
||||
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
|
||||
int code = 0;
|
||||
char *name = pWal->name;
|
||||
|
||||
char *buffer = malloc(1024000); // size for one record
|
||||
if (buffer == NULL) return -1;
|
||||
|
@ -294,10 +313,11 @@ static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp)
|
|||
break;
|
||||
}
|
||||
|
||||
// write into queue
|
||||
if (pWal->keep) pWal->version = pHead->version;
|
||||
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
|
||||
}
|
||||
|
||||
close(fd);
|
||||
free(buffer);
|
||||
|
||||
return code;
|
||||
|
@ -370,40 +390,3 @@ static int walRemoveWalFiles(const char *path) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int walMoveOldWalFilesBack(const char *path) {
|
||||
char oname[TSDB_FILENAME_LEN * 3];
|
||||
char nname[TSDB_FILENAME_LEN * 3];
|
||||
char opath[TSDB_FILENAME_LEN];
|
||||
struct dirent *ent;
|
||||
int plen = strlen(walPrefix);
|
||||
int code = 0;
|
||||
|
||||
sprintf(opath, "%s/old", path);
|
||||
|
||||
if (access(opath, F_OK) == 0) {
|
||||
// move all old files to wal directory
|
||||
int count = 0;
|
||||
|
||||
DIR *dir = opendir(opath);
|
||||
while ((ent = readdir(dir))!= NULL) {
|
||||
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
|
||||
sprintf(oname, "%s/%s", opath, ent->d_name);
|
||||
sprintf(nname, "%s/%s", path, ent->d_name);
|
||||
if (rename(oname, nname) < 0) {
|
||||
wError("wal:%s, failed to move to new:%s", oname, nname);
|
||||
code = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
wTrace("wal:%s, %d old files are move back for keep option is set", path, count);
|
||||
closedir(dir);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -29,8 +29,6 @@ int writeToQueue(void *pVnode, void *data, int type) {
|
|||
ver = pHead->version;
|
||||
|
||||
walWrite(pWal, pHead);
|
||||
|
||||
free(pHead);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -42,6 +40,7 @@ int main(int argc, char *argv[]) {
|
|||
int total = 5;
|
||||
int rows = 10000;
|
||||
int size = 128;
|
||||
int keep = 0;
|
||||
|
||||
for (int i=1; i<argc; ++i) {
|
||||
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
||||
|
@ -52,6 +51,8 @@ int main(int argc, char *argv[]) {
|
|||
level = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-r")==0 && i < argc-1) {
|
||||
rows = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
|
||||
keep = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
|
||||
total = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
|
||||
|
@ -67,6 +68,7 @@ int main(int argc, char *argv[]) {
|
|||
printf(" [-l level]: log level, default is:%d\n", level);
|
||||
printf(" [-t total]: total wal files, default is:%d\n", total);
|
||||
printf(" [-r rows]: rows of records per wal file, default is:%d\n", rows);
|
||||
printf(" [-k keep]: keep the wal after closing, default is:%d\n", keep);
|
||||
printf(" [-v version]: initial version, default is:%ld\n", ver);
|
||||
printf(" [-d debugFlag]: debug flag, default:%d\n", ddebugFlag);
|
||||
printf(" [-h help]: print out this help\n\n");
|
||||
|
@ -79,6 +81,7 @@ int main(int argc, char *argv[]) {
|
|||
SWalCfg walCfg;
|
||||
walCfg.commitLog = level;
|
||||
walCfg.wals = max;
|
||||
walCfg.keep = keep;
|
||||
|
||||
pWal = walOpen(path, &walCfg);
|
||||
if (pWal == NULL) {
|
||||
|
|
Loading…
Reference in New Issue