fix: init once
This commit is contained in:
parent
3629958b43
commit
0ad9c4cebd
|
@ -104,7 +104,7 @@ typedef struct {
|
||||||
tmr_h timer;
|
tmr_h timer;
|
||||||
} STqMgmt;
|
} STqMgmt;
|
||||||
|
|
||||||
static STqMgmt tqMgmt;
|
static STqMgmt tqMgmt = {0};
|
||||||
|
|
||||||
// init once
|
// init once
|
||||||
int tqInit();
|
int tqInit();
|
||||||
|
|
|
@ -16,22 +16,34 @@
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
|
||||||
int32_t tqInit() {
|
int32_t tqInit() {
|
||||||
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
|
int8_t old;
|
||||||
|
while (1) {
|
||||||
|
old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
|
||||||
|
if (old != 2) break;
|
||||||
|
}
|
||||||
|
|
||||||
if (old == 0) {
|
if (old == 0) {
|
||||||
tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
|
tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
|
||||||
if (tqMgmt.timer == NULL) {
|
if (tqMgmt.timer == NULL) {
|
||||||
atomic_store_8(&tqMgmt.inited, 0);
|
atomic_store_8(&tqMgmt.inited, 0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
atomic_store_8(&tqMgmt.inited, 1);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqCleanUp() {
|
void tqCleanUp() {
|
||||||
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
|
int8_t old;
|
||||||
if (old != 1) return;
|
while (1) {
|
||||||
taosTmrCleanUp(tqMgmt.timer);
|
old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
|
||||||
atomic_store_8(&tqMgmt.inited, 0);
|
if (old != 2) break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (old == 1) {
|
||||||
|
taosTmrCleanUp(tqMgmt.timer);
|
||||||
|
atomic_store_8(&tqMgmt.inited, 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
||||||
|
|
|
@ -36,31 +36,42 @@ static void walFreeObj(void *pWal);
|
||||||
int64_t walGetSeq() { return (int64_t)atomic_load_32(&tsWal.seq); }
|
int64_t walGetSeq() { return (int64_t)atomic_load_32(&tsWal.seq); }
|
||||||
|
|
||||||
int32_t walInit() {
|
int32_t walInit() {
|
||||||
int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1);
|
int8_t old;
|
||||||
if (old == 1) return 0;
|
while (1) {
|
||||||
|
old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 2);
|
||||||
tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
|
if (old != 2) break;
|
||||||
|
}
|
||||||
int32_t code = walCreateThread();
|
|
||||||
if (code != 0) {
|
if (old == 0) {
|
||||||
wError("failed to init wal module since %s", tstrerror(code));
|
tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
|
||||||
atomic_store_8(&tsWal.inited, 0);
|
|
||||||
return code;
|
int32_t code = walCreateThread();
|
||||||
|
if (code != 0) {
|
||||||
|
wError("failed to init wal module since %s", tstrerror(code));
|
||||||
|
atomic_store_8(&tsWal.inited, 0);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId);
|
||||||
|
atomic_store_8(&tsWal.inited, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void walCleanUp() {
|
void walCleanUp() {
|
||||||
int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 2);
|
int8_t old;
|
||||||
if (old != 1) {
|
while (1) {
|
||||||
return;
|
old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 2);
|
||||||
|
if (old != 2) break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (old == 1) {
|
||||||
|
walStopThread();
|
||||||
|
taosCloseRef(tsWal.refSetId);
|
||||||
|
wInfo("wal module is cleaned up");
|
||||||
|
atomic_store_8(&tsWal.inited, 0);
|
||||||
}
|
}
|
||||||
walStopThread();
|
|
||||||
taosCloseRef(tsWal.refSetId);
|
|
||||||
wInfo("wal module is cleaned up");
|
|
||||||
atomic_store_8(&tsWal.inited, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
|
|
Loading…
Reference in New Issue