commit
ded4b1d32f
|
@ -51,6 +51,7 @@ void walCleanUp();
|
||||||
|
|
||||||
twalh walOpen(char *path, SWalCfg *pCfg);
|
twalh walOpen(char *path, SWalCfg *pCfg);
|
||||||
int32_t walAlter(twalh pWal, SWalCfg *pCfg);
|
int32_t walAlter(twalh pWal, SWalCfg *pCfg);
|
||||||
|
void walStop(twalh);
|
||||||
void walClose(twalh);
|
void walClose(twalh);
|
||||||
int32_t walRenew(twalh);
|
int32_t walRenew(twalh);
|
||||||
int32_t walWrite(twalh, SWalHead *);
|
int32_t walWrite(twalh, SWalHead *);
|
||||||
|
|
|
@ -386,6 +386,10 @@ void vnodeRelease(void *pVnodeRaw) {
|
||||||
pVnode->qMgmt = NULL;
|
pVnode->qMgmt = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pVnode->wal) {
|
||||||
|
walStop(pVnode->wal);
|
||||||
|
}
|
||||||
|
|
||||||
if (pVnode->tsdb) {
|
if (pVnode->tsdb) {
|
||||||
tsdbCloseRepo(pVnode->tsdb, 1);
|
tsdbCloseRepo(pVnode->tsdb, 1);
|
||||||
pVnode->tsdb = NULL;
|
pVnode->tsdb = NULL;
|
||||||
|
|
|
@ -49,6 +49,8 @@ typedef struct {
|
||||||
int32_t level;
|
int32_t level;
|
||||||
int32_t fsyncPeriod;
|
int32_t fsyncPeriod;
|
||||||
int32_t fsyncSeq;
|
int32_t fsyncSeq;
|
||||||
|
int8_t stop;
|
||||||
|
int8_t reserved[3];
|
||||||
char path[WAL_PATH_LEN];
|
char path[WAL_PATH_LEN];
|
||||||
char name[WAL_FILE_LEN];
|
char name[WAL_FILE_LEN];
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
|
|
|
@ -110,6 +110,16 @@ int32_t walAlter(void *handle, SWalCfg *pCfg) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void walStop(void *handle) {
|
||||||
|
if (handle == NULL) return;
|
||||||
|
SWal *pWal = handle;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&pWal->mutex);
|
||||||
|
pWal->stop = 1;
|
||||||
|
pthread_mutex_unlock(&pWal->mutex);
|
||||||
|
wDebug("vgId:%d, stop write wal", pWal->vgId);
|
||||||
|
}
|
||||||
|
|
||||||
void walClose(void *handle) {
|
void walClose(void *handle) {
|
||||||
if (handle == NULL) return;
|
if (handle == NULL) return;
|
||||||
|
|
||||||
|
@ -123,9 +133,7 @@ void walClose(void *handle) {
|
||||||
while (walGetNextFile(pWal, &fileId) >= 0) {
|
while (walGetNextFile(pWal, &fileId) >= 0) {
|
||||||
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
|
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
|
||||||
|
|
||||||
if (fileId == pWal->fileId) {
|
if (remove(pWal->name) < 0) {
|
||||||
wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name);
|
|
||||||
} else if (remove(pWal->name) < 0) {
|
|
||||||
wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
|
wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
|
||||||
} else {
|
} else {
|
||||||
wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
|
wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
|
||||||
|
|
|
@ -29,6 +29,11 @@ int32_t walRenew(void *handle) {
|
||||||
SWal * pWal = handle;
|
SWal * pWal = handle;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if (pWal->stop) {
|
||||||
|
wDebug("vgId:%d, do not create a new wal file", pWal->vgId);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
pthread_mutex_lock(&pWal->mutex);
|
pthread_mutex_lock(&pWal->mutex);
|
||||||
|
|
||||||
if (pWal->fd >= 0) {
|
if (pWal->fd >= 0) {
|
||||||
|
@ -151,7 +156,7 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *
|
||||||
if (!pWal->keep) return TSDB_CODE_SUCCESS;
|
if (!pWal->keep) return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
if (count == 0) {
|
if (count == 0) {
|
||||||
wDebug("vgId:%d, file:%s not exist, renew it", pWal->vgId, pWal->name);
|
wDebug("vgId:%d, wal file not exist, renew it", pWal->vgId);
|
||||||
return walRenew(pWal);
|
return walRenew(pWal);
|
||||||
} else {
|
} else {
|
||||||
// open the existing WAL file in append mode
|
// open the existing WAL file in append mode
|
||||||
|
|
|
@ -8,8 +8,8 @@ sleep 3000
|
||||||
sql connect
|
sql connect
|
||||||
|
|
||||||
$i = 0
|
$i = 0
|
||||||
$dbPrefix = tb_in_db
|
$dbPrefix = d
|
||||||
$tbPrefix = tb_in_tb
|
$tbPrefix = t
|
||||||
$db = $dbPrefix . $i
|
$db = $dbPrefix . $i
|
||||||
$tb = $tbPrefix . $i
|
$tb = $tbPrefix . $i
|
||||||
|
|
||||||
|
@ -22,28 +22,27 @@ sql create table $tb (ts timestamp, speed int)
|
||||||
|
|
||||||
$x = 0
|
$x = 0
|
||||||
while $x < 10
|
while $x < 10
|
||||||
$ms = $x . m
|
$cc = $x * 60000
|
||||||
sql insert into $tb values (now + $ms , $x )
|
$ms = 1601481600000 + $cc
|
||||||
|
|
||||||
|
sql insert into $tb values ($ms , $x )
|
||||||
$x = $x + 1
|
$x = $x + 1
|
||||||
endw
|
endw
|
||||||
|
|
||||||
print =============== step 2
|
print =============== step 2
|
||||||
sql insert into $tb values (now - 5m , 10)
|
$x = 0
|
||||||
sql insert into $tb values (now - 6m , 10)
|
while $x < 5
|
||||||
sql insert into $tb values (now - 7m , 10)
|
$cc = $x * 60000
|
||||||
sql insert into $tb values (now - 8m , 10)
|
$ms = 1551481600000 + $cc
|
||||||
|
|
||||||
|
sql insert into $tb values ($ms , $x )
|
||||||
|
$x = $x + 1
|
||||||
|
endw
|
||||||
|
|
||||||
sql select * from $tb
|
sql select * from $tb
|
||||||
|
|
||||||
print $rows points data are retrieved
|
print $rows points data are retrieved
|
||||||
if $rows != 14 then
|
if $rows != 15 then
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
sql drop database $db
|
|
||||||
sleep 1000
|
|
||||||
sql show databases
|
|
||||||
if $rows != 0 then
|
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue