fix/TD-30849
This commit is contained in:
parent
9f47f8467d
commit
b85dba328c
|
@ -95,6 +95,7 @@ typedef struct {
|
||||||
} SWalCkHead;
|
} SWalCkHead;
|
||||||
#pragma pack(pop)
|
#pragma pack(pop)
|
||||||
|
|
||||||
|
typedef void (*stopDnodeFn)();
|
||||||
typedef struct SWal {
|
typedef struct SWal {
|
||||||
// cfg
|
// cfg
|
||||||
SWalCfg cfg;
|
SWalCfg cfg;
|
||||||
|
@ -119,6 +120,9 @@ typedef struct SWal {
|
||||||
char path[WAL_PATH_LEN];
|
char path[WAL_PATH_LEN];
|
||||||
// reusable write head
|
// reusable write head
|
||||||
SWalCkHead writeHead;
|
SWalCkHead writeHead;
|
||||||
|
|
||||||
|
stopDnodeFn stopDnode;
|
||||||
|
|
||||||
} SWal;
|
} SWal;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -152,7 +156,7 @@ typedef struct SWalReader {
|
||||||
} SWalReader;
|
} SWalReader;
|
||||||
|
|
||||||
// module initialization
|
// module initialization
|
||||||
int32_t walInit();
|
int32_t walInit(stopDnodeFn stopDnode);
|
||||||
void walCleanUp();
|
void walCleanUp();
|
||||||
|
|
||||||
// handle open and ctl
|
// handle open and ctl
|
||||||
|
|
|
@ -85,7 +85,7 @@ static int32_t mndOpenWrapper(const char *path, SMnodeOpt *opt, SMnode **pMnode)
|
||||||
}
|
}
|
||||||
static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if ((code = walInit()) != 0) {
|
if ((code = walInit(pInput->stopDnodeFp)) != 0) {
|
||||||
dError("failed to init wal since %s", tstrerror(code));
|
dError("failed to init wal since %s", tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -624,8 +624,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
tmsgReportStartup("vnode-tfs", "initialized");
|
tmsgReportStartup("vnode-tfs", "initialized");
|
||||||
|
if ((code = walInit(pInput->stopDnodeFp)) != 0) {
|
||||||
if ((code = walInit()) != 0) {
|
|
||||||
dError("failed to init wal since %s", tstrerror(code));
|
dError("failed to init wal since %s", tstrerror(code));
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
@ -638,7 +637,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||||
}
|
}
|
||||||
tmsgReportStartup("vnode-sync", "initialized");
|
tmsgReportStartup("vnode-sync", "initialized");
|
||||||
|
|
||||||
if ((code = vnodeInit(tsNumOfCommitThreads)) != 0) {
|
if ((code = vnodeInit(tsNumOfCommitThreads, pInput->stopDnodeFp)) != 0) {
|
||||||
dError("failed to init vnode since %s", tstrerror(code));
|
dError("failed to init vnode since %s", tstrerror(code));
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
|
@ -414,6 +414,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
|
||||||
.getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
|
.getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
|
||||||
.getMnodeLoadsFp = dmGetMnodeLoads,
|
.getMnodeLoadsFp = dmGetMnodeLoads,
|
||||||
.getQnodeLoadsFp = dmGetQnodeLoads,
|
.getQnodeLoadsFp = dmGetQnodeLoads,
|
||||||
|
.stopDnodeFp = dmStop,
|
||||||
};
|
};
|
||||||
|
|
||||||
opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
|
opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
|
||||||
|
|
|
@ -121,6 +121,7 @@ typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo);
|
||||||
typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo);
|
typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo);
|
||||||
typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo);
|
typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo);
|
||||||
typedef int32_t (*ProcessAlterNodeTypeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
|
typedef int32_t (*ProcessAlterNodeTypeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
|
||||||
|
typedef void (*StopDnodeFp)();
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
|
@ -159,6 +160,7 @@ typedef struct {
|
||||||
GetVnodeLoadsFp getVnodeLoadsLiteFp;
|
GetVnodeLoadsFp getVnodeLoadsLiteFp;
|
||||||
GetMnodeLoadsFp getMnodeLoadsFp;
|
GetMnodeLoadsFp getMnodeLoadsFp;
|
||||||
GetQnodeLoadsFp getQnodeLoadsFp;
|
GetQnodeLoadsFp getQnodeLoadsFp;
|
||||||
|
StopDnodeFp stopDnodeFp;
|
||||||
} SMgmtInputOpt;
|
} SMgmtInputOpt;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -581,7 +581,7 @@ void mndDumpSdb() {
|
||||||
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
|
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
|
||||||
tmsgSetDefault(&msgCb);
|
tmsgSetDefault(&msgCb);
|
||||||
|
|
||||||
(void)walInit();
|
(void)walInit(NULL);
|
||||||
(void)syncInit();
|
(void)syncInit();
|
||||||
|
|
||||||
SMnodeOpt opt = {.msgCb = msgCb};
|
SMnodeOpt opt = {.msgCb = msgCb};
|
||||||
|
|
|
@ -49,7 +49,9 @@ typedef struct SVSnapWriter SVSnapWriter;
|
||||||
|
|
||||||
extern const SVnodeCfg vnodeCfgDefault;
|
extern const SVnodeCfg vnodeCfgDefault;
|
||||||
|
|
||||||
int32_t vnodeInit(int32_t nthreads);
|
typedef void (*StopDnodeFp)();
|
||||||
|
|
||||||
|
int32_t vnodeInit(int32_t nthreads, StopDnodeFp stopDnodeFp);
|
||||||
void vnodeCleanup();
|
void vnodeCleanup();
|
||||||
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs);
|
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs);
|
||||||
bool vnodeShouldRemoveWal(SVnode *pVnode);
|
bool vnodeShouldRemoveWal(SVnode *pVnode);
|
||||||
|
|
|
@ -18,13 +18,13 @@
|
||||||
|
|
||||||
static volatile int32_t VINIT = 0;
|
static volatile int32_t VINIT = 0;
|
||||||
|
|
||||||
int vnodeInit(int nthreads) {
|
int vnodeInit(int nthreads, StopDnodeFp stopDnodeFp) {
|
||||||
if (atomic_val_compare_exchange_32(&VINIT, 0, 1)) {
|
if (atomic_val_compare_exchange_32(&VINIT, 0, 1)) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_CHECK_RETURN(vnodeAsyncOpen(nthreads));
|
TAOS_CHECK_RETURN(vnodeAsyncOpen(nthreads));
|
||||||
TAOS_CHECK_RETURN(walInit());
|
TAOS_CHECK_RETURN(walInit(stopDnodeFp));
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ typedef struct {
|
||||||
uint32_t seq;
|
uint32_t seq;
|
||||||
int32_t refSetId;
|
int32_t refSetId;
|
||||||
TdThread thread;
|
TdThread thread;
|
||||||
|
stopDnodeFn stopDnode;
|
||||||
} SWalMgmt;
|
} SWalMgmt;
|
||||||
|
|
||||||
static SWalMgmt tsWal = {0, .seq = 1};
|
static SWalMgmt tsWal = {0, .seq = 1};
|
||||||
|
@ -35,7 +36,7 @@ static void walFreeObj(void *pWal);
|
||||||
|
|
||||||
int64_t walGetSeq() { return (int64_t)atomic_load_32((volatile int32_t *)&tsWal.seq); }
|
int64_t walGetSeq() { return (int64_t)atomic_load_32((volatile int32_t *)&tsWal.seq); }
|
||||||
|
|
||||||
int32_t walInit() {
|
int32_t walInit(stopDnodeFn stopDnode) {
|
||||||
int8_t old;
|
int8_t old;
|
||||||
while (1) {
|
while (1) {
|
||||||
old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 2);
|
old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 2);
|
||||||
|
@ -57,6 +58,11 @@ int32_t walInit() {
|
||||||
atomic_store_8(&tsWal.inited, 1);
|
atomic_store_8(&tsWal.inited, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (stopDnode == NULL) {
|
||||||
|
wWarn("failed to set stop dnode call back");
|
||||||
|
}
|
||||||
|
tsWal.stopDnode = stopDnode;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,6 +170,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pWal->stopDnode = tsWal.stopDnode;
|
||||||
|
|
||||||
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
|
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
|
||||||
pWal->cfg.fsyncPeriod);
|
pWal->cfg.fsyncPeriod);
|
||||||
return pWal;
|
return pWal;
|
||||||
|
|
|
@ -525,6 +525,11 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
||||||
if (size != sizeof(SWalIdxEntry)) {
|
if (size != sizeof(SWalIdxEntry)) {
|
||||||
wError("vgId:%d, failed to write idx entry due to %s. ver:%" PRId64, pWal->cfg.vgId, strerror(errno), ver);
|
wError("vgId:%d, failed to write idx entry due to %s. ver:%" PRId64, pWal->cfg.vgId, strerror(errno), ver);
|
||||||
|
|
||||||
|
if (pWal->stopDnode != NULL) {
|
||||||
|
wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId);
|
||||||
|
pWal->stopDnode();
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -571,6 +576,11 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
||||||
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
|
|
||||||
|
if (pWal->stopDnode != NULL) {
|
||||||
|
wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId);
|
||||||
|
pWal->stopDnode();
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -627,6 +637,11 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
||||||
taosMemoryFreeClear(newBodyEncrypted);
|
taosMemoryFreeClear(newBodyEncrypted);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pWal->stopDnode != NULL) {
|
||||||
|
wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId);
|
||||||
|
pWal->stopDnode();
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue