Merge pull request #4195 from taosdata/feature/wal
[TD-1896]<fix>: Wal data may be lost when the system crashes
This commit is contained in:
commit
7cce5d8871
|
@ -44,6 +44,7 @@ extern int32_t tsMaxShellConns;
|
||||||
extern int32_t tsShellActivityTimer;
|
extern int32_t tsShellActivityTimer;
|
||||||
extern uint32_t tsMaxTmrCtrl;
|
extern uint32_t tsMaxTmrCtrl;
|
||||||
extern float tsNumOfThreadsPerCore;
|
extern float tsNumOfThreadsPerCore;
|
||||||
|
extern int32_t tsNumOfCommitThreads;
|
||||||
extern float tsRatioOfQueryThreads; // todo remove it
|
extern float tsRatioOfQueryThreads; // todo remove it
|
||||||
extern int8_t tsDaylight;
|
extern int8_t tsDaylight;
|
||||||
extern char tsTimezone[];
|
extern char tsTimezone[];
|
||||||
|
|
|
@ -51,6 +51,7 @@ int32_t tsMaxShellConns = 5000;
|
||||||
int32_t tsMaxConnections = 5000;
|
int32_t tsMaxConnections = 5000;
|
||||||
int32_t tsShellActivityTimer = 3; // second
|
int32_t tsShellActivityTimer = 3; // second
|
||||||
float tsNumOfThreadsPerCore = 1.0f;
|
float tsNumOfThreadsPerCore = 1.0f;
|
||||||
|
int32_t tsNumOfCommitThreads = 1;
|
||||||
float tsRatioOfQueryThreads = 0.5f;
|
float tsRatioOfQueryThreads = 0.5f;
|
||||||
int8_t tsDaylight = 0;
|
int8_t tsDaylight = 0;
|
||||||
char tsTimezone[TSDB_TIMEZONE_LEN] = {0};
|
char tsTimezone[TSDB_TIMEZONE_LEN] = {0};
|
||||||
|
@ -426,6 +427,16 @@ static void doInitGlobalConfig(void) {
|
||||||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||||
taosInitConfigOption(cfg);
|
taosInitConfigOption(cfg);
|
||||||
|
|
||||||
|
cfg.option = "numOfCommitThreads";
|
||||||
|
cfg.ptr = &tsNumOfCommitThreads;
|
||||||
|
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||||
|
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
|
||||||
|
cfg.minValue = 1;
|
||||||
|
cfg.maxValue = 100;
|
||||||
|
cfg.ptrLength = 0;
|
||||||
|
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||||
|
taosInitConfigOption(cfg);
|
||||||
|
|
||||||
cfg.option = "ratioOfQueryThreads";
|
cfg.option = "ratioOfQueryThreads";
|
||||||
cfg.ptr = &tsRatioOfQueryThreads;
|
cfg.ptr = &tsRatioOfQueryThreads;
|
||||||
cfg.valType = TAOS_CFG_VTYPE_FLOAT;
|
cfg.valType = TAOS_CFG_VTYPE_FLOAT;
|
||||||
|
|
|
@ -54,16 +54,17 @@ typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg)
|
||||||
int32_t walInit();
|
int32_t walInit();
|
||||||
void walCleanUp();
|
void walCleanUp();
|
||||||
|
|
||||||
twalh walOpen(char *path, SWalCfg *pCfg);
|
twalh walOpen(char *path, SWalCfg *pCfg);
|
||||||
int32_t walAlter(twalh pWal, SWalCfg *pCfg);
|
int32_t walAlter(twalh pWal, SWalCfg *pCfg);
|
||||||
void walStop(twalh);
|
void walStop(twalh);
|
||||||
void walClose(twalh);
|
void walClose(twalh);
|
||||||
int32_t walRenew(twalh);
|
int32_t walRenew(twalh);
|
||||||
void walRemoveOldFiles(twalh);
|
void walRemoveOneOldFile(twalh);
|
||||||
int32_t walWrite(twalh, SWalHead *);
|
void walRemoveAllOldFiles(twalh);
|
||||||
void walFsync(twalh, bool forceFsync);
|
int32_t walWrite(twalh, SWalHead *);
|
||||||
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
|
void walFsync(twalh, bool forceFsync);
|
||||||
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
|
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
|
||||||
|
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
|
||||||
uint64_t walGetVersion(twalh);
|
uint64_t walGetVersion(twalh);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -377,7 +377,8 @@ int taosCheckVersion(char *input_client_version, char *input_server_version, int
|
||||||
|
|
||||||
for(int32_t i = 0; i < comparedSegments; ++i) {
|
for(int32_t i = 0; i < comparedSegments; ++i) {
|
||||||
if (clientVersionNumber[i] != serverVersionNumber[i]) {
|
if (clientVersionNumber[i] != serverVersionNumber[i]) {
|
||||||
uError("the %d-th number of server version:%s not matched with client version:%s", i, server_version, version);
|
uError("the %d-th number of server version:%s not matched with client version:%s", i, server_version,
|
||||||
|
client_version);
|
||||||
return TSDB_CODE_TSC_INVALID_VERSION;
|
return TSDB_CODE_TSC_INVALID_VERSION;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,8 +28,6 @@
|
||||||
#include "vnodeCfg.h"
|
#include "vnodeCfg.h"
|
||||||
#include "vnodeVersion.h"
|
#include "vnodeVersion.h"
|
||||||
|
|
||||||
#define DEFAULT_COMMIT_THREADS 1
|
|
||||||
|
|
||||||
static SHashObj*tsVnodesHash;
|
static SHashObj*tsVnodesHash;
|
||||||
static void vnodeCleanUp(SVnodeObj *pVnode);
|
static void vnodeCleanUp(SVnodeObj *pVnode);
|
||||||
static int vnodeProcessTsdbStatus(void *arg, int status);
|
static int vnodeProcessTsdbStatus(void *arg, int status);
|
||||||
|
@ -69,7 +67,7 @@ int32_t vnodeInitResources() {
|
||||||
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbInitCommitQueue(DEFAULT_COMMIT_THREADS) < 0) {
|
if (tsdbInitCommitQueue(tsNumOfCommitThreads) < 0) {
|
||||||
vError("failed to init vnode commit queue");
|
vError("failed to init vnode commit queue");
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -317,6 +315,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
pVnode->version = walGetVersion(pVnode->wal);
|
pVnode->version = walGetVersion(pVnode->wal);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsdbSyncCommit(pVnode->tsdb);
|
||||||
|
walRemoveAllOldFiles(pVnode->wal);
|
||||||
walRenew(pVnode->wal);
|
walRenew(pVnode->wal);
|
||||||
|
|
||||||
SSyncInfo syncInfo;
|
SSyncInfo syncInfo;
|
||||||
|
@ -592,7 +592,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
|
||||||
|
|
||||||
if (status == TSDB_STATUS_COMMIT_OVER) {
|
if (status == TSDB_STATUS_COMMIT_OVER) {
|
||||||
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
|
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
|
||||||
walRemoveOldFiles(pVnode->wal);
|
walRemoveOneOldFile(pVnode->wal);
|
||||||
return vnodeSaveVersion(pVnode);
|
return vnodeSaveVersion(pVnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -128,16 +128,7 @@ void walClose(void *handle) {
|
||||||
taosClose(pWal->fd);
|
taosClose(pWal->fd);
|
||||||
|
|
||||||
if (pWal->keep != TAOS_WAL_KEEP) {
|
if (pWal->keep != TAOS_WAL_KEEP) {
|
||||||
int64_t fileId = -1;
|
walRemoveAllOldFiles(pWal);
|
||||||
while (walGetNextFile(pWal, &fileId) >= 0) {
|
|
||||||
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
|
|
||||||
|
|
||||||
if (remove(pWal->name) < 0) {
|
|
||||||
wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
|
|
||||||
} else {
|
|
||||||
wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name);
|
wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name);
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,7 +63,7 @@ int32_t walRenew(void *handle) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void walRemoveOldFiles(void *handle) {
|
void walRemoveOneOldFile(void *handle) {
|
||||||
SWal *pWal = handle;
|
SWal *pWal = handle;
|
||||||
if (pWal == NULL) return;
|
if (pWal == NULL) return;
|
||||||
if (pWal->keep == TAOS_WAL_KEEP) return;
|
if (pWal->keep == TAOS_WAL_KEEP) return;
|
||||||
|
@ -86,6 +86,22 @@ void walRemoveOldFiles(void *handle) {
|
||||||
pthread_mutex_unlock(&pWal->mutex);
|
pthread_mutex_unlock(&pWal->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void walRemoveAllOldFiles(void *handle) {
|
||||||
|
if (handle == NULL) return;
|
||||||
|
|
||||||
|
SWal * pWal = handle;
|
||||||
|
int64_t fileId = -1;
|
||||||
|
while (walGetNextFile(pWal, &fileId) >= 0) {
|
||||||
|
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
|
||||||
|
|
||||||
|
if (remove(pWal->name) < 0) {
|
||||||
|
wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
|
||||||
|
} else {
|
||||||
|
wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t walWrite(void *handle, SWalHead *pHead) {
|
int32_t walWrite(void *handle, SWalHead *pHead) {
|
||||||
if (handle == NULL) return -1;
|
if (handle == NULL) return -1;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 100
|
||||||
|
system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 1
|
||||||
|
system sh/cfg.sh -n dnode1 -c tableIncStepPerVnode -v 2
|
||||||
|
|
||||||
|
|
||||||
|
print ============== deploy
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sleep 3001
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql create database d1
|
||||||
|
sql use d1
|
||||||
|
sql create table st (ts timestamp, tbcol int) TAGS(tgcol int)
|
||||||
|
|
||||||
|
$i = 0
|
||||||
|
while $i < 100
|
||||||
|
$tb = t . $i
|
||||||
|
sql create table $tb using st tags( $i )
|
||||||
|
sql insert into $tb values (now , $i )
|
||||||
|
$i = $i + 1
|
||||||
|
endw
|
||||||
|
|
||||||
|
sql_error sql create table tt (ts timestamp, i int)
|
||||||
|
|
||||||
|
print =============== step3
|
||||||
|
sql select * from st;
|
||||||
|
if $rows != 100 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 4
|
||||||
|
sleep 3000
|
||||||
|
|
||||||
|
print =============== step4
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sleep 3000
|
||||||
|
|
||||||
|
sql select * from st;
|
||||||
|
if $rows != 100 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -238,6 +238,7 @@ cd ../../../debug; make
|
||||||
|
|
||||||
./test.sh -f general/wal/sync.sim
|
./test.sh -f general/wal/sync.sim
|
||||||
./test.sh -f general/wal/kill.sim
|
./test.sh -f general/wal/kill.sim
|
||||||
|
./test.sh -f general/wal/maxtables.sim
|
||||||
|
|
||||||
./test.sh -f unique/account/account_create.sim
|
./test.sh -f unique/account/account_create.sim
|
||||||
./test.sh -f unique/account/account_delete.sim
|
./test.sh -f unique/account/account_delete.sim
|
||||||
|
|
Loading…
Reference in New Issue