TD-1846
This commit is contained in:
parent
30c7c2c0f6
commit
08c82ac396
|
@ -53,6 +53,10 @@ typedef struct {
|
|||
pthread_mutex_t mutex;
|
||||
} SWal;
|
||||
|
||||
// util
|
||||
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
|
||||
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "walInt.h"
|
||||
|
||||
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) {
|
||||
int64_t curFileId = *nextFileId;
|
||||
int64_t nearFileId = INT64_MAX;
|
||||
|
||||
DIR *dir = opendir(pWal->path);
|
||||
if (dir == NULL) {
|
||||
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
struct dirent *ent;
|
||||
while ((ent = readdir(dir)) != NULL) {
|
||||
char *name = ent->d_name;
|
||||
|
||||
if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
|
||||
int64_t id = atoll(name + WAL_PREFIX_LEN);
|
||||
if (id <= curFileId) continue;
|
||||
|
||||
if (id < nearFileId) {
|
||||
nearFileId = id;
|
||||
}
|
||||
}
|
||||
}
|
||||
closedir(dir);
|
||||
|
||||
if (nearFileId == INT64_MAX) return -1;
|
||||
|
||||
*nextFileId = nearFileId;
|
||||
wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " nextFileId:%" PRId64, pWal->vgId, pWal->path, curFileId, *nextFileId);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId) {
|
||||
int64_t nearFileId = INT64_MAX;
|
||||
|
||||
DIR *dir = opendir(pWal->path);
|
||||
if (dir == NULL) {
|
||||
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
struct dirent *ent;
|
||||
while ((ent = readdir(dir)) != NULL) {
|
||||
char *name = ent->d_name;
|
||||
|
||||
if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
|
||||
int64_t id = atoll(name + WAL_PREFIX_LEN);
|
||||
if (id >= curFileId) continue;
|
||||
|
||||
minDiff--;
|
||||
if (id < nearFileId) {
|
||||
nearFileId = id;
|
||||
}
|
||||
}
|
||||
}
|
||||
closedir(dir);
|
||||
|
||||
if (nearFileId == INT64_MAX) return -1;
|
||||
if (minDiff > 0) return -1;
|
||||
|
||||
*oldFileId = nearFileId;
|
||||
wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " oldFildId:%" PRId64, pWal->vgId, pWal->path, curFileId, *oldFileId);
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -18,13 +18,10 @@
|
|||
#include "talloc.h"
|
||||
#include "taoserror.h"
|
||||
#include "tchecksum.h"
|
||||
#include "tutil.h"
|
||||
#include "tqueue.h"
|
||||
#include "twal.h"
|
||||
#include "walInt.h"
|
||||
|
||||
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name);
|
||||
static int32_t walGetNextFile(SWal *pWal, int64_t lastFileId, int64_t *nexFileId, char *nextFileName);
|
||||
|
||||
int32_t walRenew(void *handle) {
|
||||
if (handle == NULL) return 0;
|
||||
|
@ -39,12 +36,7 @@ int32_t walRenew(void *handle) {
|
|||
wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name);
|
||||
}
|
||||
|
||||
int64_t lastId = pWal->fileId;
|
||||
if (pWal->keep) {
|
||||
pWal->fileId = 0;
|
||||
} else {
|
||||
pWal->fileId = taosGetTimestampUs();
|
||||
}
|
||||
pWal->fileId = (pWal->keep ? 0 : taosGetTimestampUs());
|
||||
|
||||
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
|
||||
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
|
@ -56,14 +48,18 @@ int32_t walRenew(void *handle) {
|
|||
wDebug("vgId:%d, file:%s, it is created", pWal->vgId, pWal->name);
|
||||
}
|
||||
|
||||
if (!pWal->keep && lastId != -1) {
|
||||
// remove last wal file
|
||||
char name[WAL_FILE_LEN];
|
||||
snprintf(name, sizeof(name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, lastId);
|
||||
if (remove(name) < 0) {
|
||||
wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, name, strerror(errno));
|
||||
} else {
|
||||
wDebug("vgId:%d, file:%s, it is removed", pWal->vgId, name);
|
||||
if (!pWal->keep) {
|
||||
// remove the oldest wal file
|
||||
int64_t oldFileId = -1;
|
||||
if (walGetOldFile(pWal, pWal->fileId, 2, &oldFileId) == 0) {
|
||||
char walName[WAL_FILE_LEN] = {0};
|
||||
snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId);
|
||||
|
||||
if (remove(walName) < 0) {
|
||||
wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno));
|
||||
} else {
|
||||
wDebug("vgId:%d, file:%s, it is removed", pWal->vgId, walName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -115,54 +111,47 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *
|
|||
|
||||
SWal * pWal = handle;
|
||||
int32_t count = 0;
|
||||
int32_t code = 0;
|
||||
int64_t fileId = -1;
|
||||
|
||||
DIR *dir = opendir(pWal->path);
|
||||
if (dir == NULL && errno == ENOENT) return 0;
|
||||
if (dir == NULL) return TAOS_SYSTEM_ERROR(errno);
|
||||
while ((code = walGetNextFile(pWal, &fileId)) >= 0) {
|
||||
if (fileId == pWal->fileId) continue;
|
||||
|
||||
struct dirent *ent;
|
||||
while ((ent = readdir(dir)) != NULL) {
|
||||
char *fileName = ent->d_name;
|
||||
char walName[WAL_FILE_LEN];
|
||||
snprintf(walName, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
|
||||
|
||||
if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
|
||||
int64_t fileId = atoll(fileName + WAL_PREFIX_LEN);
|
||||
if (fileId == pWal->fileId) continue;
|
||||
|
||||
char walName[WAL_FILE_LEN];
|
||||
snprintf(walName, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
|
||||
|
||||
wDebug("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
|
||||
int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, walName);
|
||||
if (code != TSDB_CODE_SUCCESS) continue;
|
||||
|
||||
|
||||
if (!pWal->keep) {
|
||||
wDebug("vgId:%d, file:%s, restore success, remove this file", pWal->vgId, walName);
|
||||
remove(walName);
|
||||
} else {
|
||||
wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName);
|
||||
}
|
||||
|
||||
count++;
|
||||
wDebug("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
|
||||
int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, walName);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
wDebug("vgId:%d, file:%s, failed to restore since %s", pWal->vgId, walName, tstrerror(code));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
closedir(dir);
|
||||
|
||||
if (pWal->keep) {
|
||||
if (count == 0) {
|
||||
wDebug("vgId:%d, file:%s not exist, renew it", pWal->vgId, pWal->name);
|
||||
return walRenew(pWal);
|
||||
if (!pWal->keep) {
|
||||
wDebug("vgId:%d, file:%s, restore success, remove this file", pWal->vgId, walName);
|
||||
remove(walName);
|
||||
} else {
|
||||
// open the existing WAL file in append mode
|
||||
pWal->fileId = 0;
|
||||
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
|
||||
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
if (pWal->fd < 0) {
|
||||
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno));
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
wDebug("vgId:%d, file:%s open success", pWal->vgId, pWal->name);
|
||||
wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName);
|
||||
}
|
||||
|
||||
count++;
|
||||
}
|
||||
|
||||
if (!pWal->keep) return TSDB_CODE_SUCCESS;
|
||||
|
||||
if (count == 0) {
|
||||
wDebug("vgId:%d, file:%s not exist, renew it", pWal->vgId, pWal->name);
|
||||
return walRenew(pWal);
|
||||
} else {
|
||||
// open the existing WAL file in append mode
|
||||
pWal->fileId = 0;
|
||||
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
|
||||
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
if (pWal->fd < 0) {
|
||||
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno));
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
wDebug("vgId:%d, file:%s open success", pWal->vgId, pWal->name);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -173,8 +162,9 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
|
|||
SWal *pWal = handle;
|
||||
|
||||
pthread_mutex_lock(&(pWal->mutex));
|
||||
int32_t code = walGetNextFile(pWal, *fileId, fileId, fileName);
|
||||
if (code == 0) {
|
||||
int32_t code = walGetNextFile(pWal, fileId);
|
||||
if (code >= 0) {
|
||||
sprintf(fileName, "wal/%s%" PRId64, WAL_PREFIX, *fileId);
|
||||
code = (*fileId == pWal->fileId) ? 0 : 1;
|
||||
}
|
||||
pthread_mutex_unlock(&(pWal->mutex));
|
||||
|
@ -276,38 +266,3 @@ int64_t walGetVersion(twalh param) {
|
|||
|
||||
return pWal->version;
|
||||
}
|
||||
|
||||
static int32_t walGetNextFile(SWal *pWal, int64_t lastFileId, int64_t *nexFileId, char *nextFileName) {
|
||||
int64_t nearFileId = INT64_MAX;
|
||||
char nearFileName[WAL_FILE_LEN] = {0};
|
||||
|
||||
DIR *dir = opendir(pWal->path);
|
||||
if (dir == NULL) {
|
||||
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
struct dirent *ent;
|
||||
while ((ent = readdir(dir)) != NULL) {
|
||||
char *fileName = ent->d_name;
|
||||
|
||||
if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
|
||||
int64_t fileId = atoll(fileName + WAL_PREFIX_LEN);
|
||||
if (fileId <= lastFileId) continue;
|
||||
|
||||
if (fileId < nearFileId) {
|
||||
nearFileId = fileId;
|
||||
tstrncpy(nearFileName, fileName, WAL_FILE_LEN);
|
||||
}
|
||||
}
|
||||
}
|
||||
closedir(dir);
|
||||
|
||||
if (nearFileId == INT64_MAX) return -1;
|
||||
|
||||
*nexFileId = nearFileId;
|
||||
tstrncpy(nextFileName, nearFileName, WAL_FILE_LEN);
|
||||
wTrace("vgId:%d, path:%s, lastfile %" PRId64 ", nextfile is %s", pWal->vgId, pWal->path, lastFileId, nextFileName);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue