TD-1872 according to the suggestion of review
This commit is contained in:
parent
249e599bd1
commit
d103af8ff8
|
@ -32,7 +32,7 @@ extern int32_t wDebugFlag;
|
||||||
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
|
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
|
||||||
|
|
||||||
#define WAL_PREFIX "wal"
|
#define WAL_PREFIX "wal"
|
||||||
#define WAL_PREFIX_LEN 3
|
#define WAL_PREFIX_LEN 2
|
||||||
#define WAL_REFRESH_MS 1000
|
#define WAL_REFRESH_MS 1000
|
||||||
#define WAL_MAX_SIZE (1024 * 1024)
|
#define WAL_MAX_SIZE (1024 * 1024)
|
||||||
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
|
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
|
||||||
|
@ -56,6 +56,7 @@ typedef struct {
|
||||||
|
|
||||||
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
|
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
|
||||||
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
|
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
|
||||||
|
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,6 @@
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t refId;
|
int32_t refId;
|
||||||
int32_t num;
|
|
||||||
int32_t seq;
|
int32_t seq;
|
||||||
int8_t stop;
|
int8_t stop;
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
|
@ -85,7 +84,6 @@ void *walOpen(char *path, SWalCfg *pCfg) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_add_fetch_32(&tsWal.num, 1);
|
|
||||||
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod);
|
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod);
|
||||||
|
|
||||||
return pWal;
|
return pWal;
|
||||||
|
@ -116,19 +114,28 @@ void walClose(void *handle) {
|
||||||
if (handle == NULL) return;
|
if (handle == NULL) return;
|
||||||
|
|
||||||
SWal *pWal = handle;
|
SWal *pWal = handle;
|
||||||
|
pthread_mutex_lock(&pWal->mutex);
|
||||||
|
|
||||||
taosClose(pWal->fd);
|
taosClose(pWal->fd);
|
||||||
|
|
||||||
if (!pWal->keep) {
|
if (!pWal->keep) {
|
||||||
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
|
int64_t fileId = -1;
|
||||||
if (remove(pWal->name) < 0) {
|
while (walGetNextFile(pWal, &fileId) >= 0) {
|
||||||
wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
|
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
|
||||||
} else {
|
|
||||||
wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
|
if (fileId == pWal->fileId) {
|
||||||
|
wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name);
|
||||||
|
} else if (remove(pWal->name) < 0) {
|
||||||
|
wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
|
||||||
|
} else {
|
||||||
|
wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name);
|
wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&pWal->mutex);
|
||||||
taosRemoveRef(tsWal.refId, pWal);
|
taosRemoveRef(tsWal.refId, pWal);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
|
|
||||||
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) {
|
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) {
|
||||||
int64_t curFileId = *nextFileId;
|
int64_t curFileId = *nextFileId;
|
||||||
int64_t nearFileId = INT64_MAX;
|
int64_t minFileId = INT64_MAX;
|
||||||
|
|
||||||
DIR *dir = opendir(pWal->path);
|
DIR *dir = opendir(pWal->path);
|
||||||
if (dir == NULL) {
|
if (dir == NULL) {
|
||||||
|
@ -35,23 +35,23 @@ int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) {
|
||||||
int64_t id = atoll(name + WAL_PREFIX_LEN);
|
int64_t id = atoll(name + WAL_PREFIX_LEN);
|
||||||
if (id <= curFileId) continue;
|
if (id <= curFileId) continue;
|
||||||
|
|
||||||
if (id < nearFileId) {
|
if (id < minFileId) {
|
||||||
nearFileId = id;
|
minFileId = id;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
closedir(dir);
|
closedir(dir);
|
||||||
|
|
||||||
if (nearFileId == INT64_MAX) return -1;
|
if (minFileId == INT64_MAX) return -1;
|
||||||
|
|
||||||
*nextFileId = nearFileId;
|
*nextFileId = minFileId;
|
||||||
wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " nextFileId:%" PRId64, pWal->vgId, pWal->path, curFileId, *nextFileId);
|
wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " nextFileId:%" PRId64, pWal->vgId, pWal->path, curFileId, *nextFileId);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId) {
|
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId) {
|
||||||
int64_t nearFileId = INT64_MAX;
|
int64_t minFileId = INT64_MAX;
|
||||||
|
|
||||||
DIR *dir = opendir(pWal->path);
|
DIR *dir = opendir(pWal->path);
|
||||||
if (dir == NULL) {
|
if (dir == NULL) {
|
||||||
|
@ -68,18 +68,51 @@ int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *o
|
||||||
if (id >= curFileId) continue;
|
if (id >= curFileId) continue;
|
||||||
|
|
||||||
minDiff--;
|
minDiff--;
|
||||||
if (id < nearFileId) {
|
if (id < minFileId) {
|
||||||
nearFileId = id;
|
minFileId = id;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
closedir(dir);
|
closedir(dir);
|
||||||
|
|
||||||
if (nearFileId == INT64_MAX) return -1;
|
if (minFileId == INT64_MAX) return -1;
|
||||||
if (minDiff > 0) return -1;
|
if (minDiff > 0) return -1;
|
||||||
|
|
||||||
*oldFileId = nearFileId;
|
*oldFileId = minFileId;
|
||||||
wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " oldFildId:%" PRId64, pWal->vgId, pWal->path, curFileId, *oldFileId);
|
wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " oldFildId:%" PRId64, pWal->vgId, pWal->path, curFileId, *oldFileId);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) {
|
||||||
|
int64_t maxFileId = INT64_MIN;
|
||||||
|
|
||||||
|
DIR *dir = opendir(pWal->path);
|
||||||
|
if (dir == NULL) {
|
||||||
|
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct dirent *ent;
|
||||||
|
while ((ent = readdir(dir)) != NULL) {
|
||||||
|
char *name = ent->d_name;
|
||||||
|
|
||||||
|
if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
|
||||||
|
int64_t id = atoll(name + WAL_PREFIX_LEN);
|
||||||
|
if (id > maxFileId) {
|
||||||
|
maxFileId = id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
closedir(dir);
|
||||||
|
|
||||||
|
if (maxFileId == INT64_MAX) {
|
||||||
|
*newFileId = 0;
|
||||||
|
} else {
|
||||||
|
*newFileId = maxFileId;
|
||||||
|
}
|
||||||
|
|
||||||
|
wTrace("vgId:%d, path:%s, newFileId:%" PRId64, pWal->vgId, pWal->path, *newFileId);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -36,7 +36,12 @@ int32_t walRenew(void *handle) {
|
||||||
wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name);
|
wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name);
|
||||||
}
|
}
|
||||||
|
|
||||||
pWal->fileId = (pWal->keep ? 0 : taosGetTimestampUs());
|
if (pWal->keep) {
|
||||||
|
pWal->fileId = 0;
|
||||||
|
} else {
|
||||||
|
if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0;
|
||||||
|
pWal->fileId++;
|
||||||
|
}
|
||||||
|
|
||||||
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
|
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
|
||||||
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
|
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||||
|
@ -82,6 +87,8 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
|
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
|
||||||
int32_t contLen = pHead->len + sizeof(SWalHead);
|
int32_t contLen = pHead->len + sizeof(SWalHead);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&pWal->mutex);
|
||||||
|
|
||||||
if (taosTWrite(pWal->fd, pHead, contLen) != contLen) {
|
if (taosTWrite(pWal->fd, pHead, contLen) != contLen) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno));
|
wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno));
|
||||||
|
@ -90,6 +97,8 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
|
||||||
wTrace("vgId:%d, write version:%" PRId64 ", fileId:%" PRId64, pWal->vgId, pWal->version, pWal->fileId);
|
wTrace("vgId:%d, write version:%" PRId64 ", fileId:%" PRId64, pWal->vgId, pWal->version, pWal->fileId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&pWal->mutex);
|
||||||
|
|
||||||
ASSERT(contLen == pHead->len + sizeof(SWalHead));
|
ASSERT(contLen == pHead->len + sizeof(SWalHead));
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue