diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 66ea5ea5c7..f74e26eeda 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -95,6 +95,7 @@ typedef struct { } SWalCkHead; #pragma pack(pop) +typedef void (*stopDnodeFn)(); typedef struct SWal { // cfg SWalCfg cfg; @@ -119,6 +120,9 @@ typedef struct SWal { char path[WAL_PATH_LEN]; // reusable write head SWalCkHead writeHead; + + stopDnodeFn stopDnode; + } SWal; typedef struct { @@ -152,7 +156,7 @@ typedef struct SWalReader { } SWalReader; // module initialization -int32_t walInit(); +int32_t walInit(stopDnodeFn stopDnode); void walCleanUp(); // handle open and ctl diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 20802e33d9..48606b2ed9 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -85,7 +85,7 @@ static int32_t mndOpenWrapper(const char *path, SMnodeOpt *opt, SMnode **pMnode) } static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { int32_t code = 0; - if ((code = walInit()) != 0) { + if ((code = walInit(pInput->stopDnodeFp)) != 0) { dError("failed to init wal since %s", tstrerror(code)); return code; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 6bc0b5fe93..e599676cec 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -624,8 +624,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { goto _OVER; } tmsgReportStartup("vnode-tfs", "initialized"); - - if ((code = walInit()) != 0) { + if ((code = walInit(pInput->stopDnodeFp)) != 0) { dError("failed to init wal since %s", tstrerror(code)); goto _OVER; } @@ -638,7 +637,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { } tmsgReportStartup("vnode-sync", "initialized"); - if ((code = vnodeInit(tsNumOfCommitThreads)) != 0) { + if ((code = vnodeInit(tsNumOfCommitThreads, pInput->stopDnodeFp)) != 0) { dError("failed to init vnode since %s", tstrerror(code)); goto _OVER; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 9819c4f64e..0a75847d96 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -414,6 +414,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite, .getMnodeLoadsFp = dmGetMnodeLoads, .getQnodeLoadsFp = dmGetQnodeLoads, + .stopDnodeFp = dmStop, }; opt.msgCb = dmGetMsgcb(pWrapper->pDnode); diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 3b94f00bee..5be41f830d 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -121,6 +121,7 @@ typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo); typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo); typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo); typedef int32_t (*ProcessAlterNodeTypeFp)(EDndNodeType ntype, SRpcMsg *pMsg); +typedef void (*StopDnodeFp)(); typedef struct { int32_t dnodeId; @@ -159,6 +160,7 @@ typedef struct { GetVnodeLoadsFp getVnodeLoadsLiteFp; GetMnodeLoadsFp getMnodeLoadsFp; GetQnodeLoadsFp getQnodeLoadsFp; + StopDnodeFp stopDnodeFp; } SMgmtInputOpt; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndDump.c b/source/dnode/mnode/impl/src/mndDump.c index 31e092f1a4..565e244014 100644 --- a/source/dnode/mnode/impl/src/mndDump.c +++ b/source/dnode/mnode/impl/src/mndDump.c @@ -581,7 +581,7 @@ void mndDumpSdb() { msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack tmsgSetDefault(&msgCb); - (void)walInit(); + (void)walInit(NULL); (void)syncInit(); SMnodeOpt opt = {.msgCb = msgCb}; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index d01db56013..2f56aac7d6 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -49,7 +49,9 @@ typedef struct SVSnapWriter SVSnapWriter; extern const SVnodeCfg vnodeCfgDefault; -int32_t vnodeInit(int32_t nthreads); +typedef void (*StopDnodeFp)(); + +int32_t vnodeInit(int32_t nthreads, StopDnodeFp stopDnodeFp); void vnodeCleanup(); int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs); bool vnodeShouldRemoveWal(SVnode *pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index 8b7de7058c..709bfa19bc 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -18,13 +18,13 @@ static volatile int32_t VINIT = 0; -int vnodeInit(int nthreads) { +int vnodeInit(int nthreads, StopDnodeFp stopDnodeFp) { if (atomic_val_compare_exchange_32(&VINIT, 0, 1)) { return 0; } TAOS_CHECK_RETURN(vnodeAsyncOpen(nthreads)); - TAOS_CHECK_RETURN(walInit()); + TAOS_CHECK_RETURN(walInit(stopDnodeFp)); return 0; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 9da3207471..581a63671c 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -26,6 +26,7 @@ typedef struct { uint32_t seq; int32_t refSetId; TdThread thread; + stopDnodeFn stopDnode; } SWalMgmt; static SWalMgmt tsWal = {0, .seq = 1}; @@ -35,7 +36,7 @@ static void walFreeObj(void *pWal); int64_t walGetSeq() { return (int64_t)atomic_load_32((volatile int32_t *)&tsWal.seq); } -int32_t walInit() { +int32_t walInit(stopDnodeFn stopDnode) { int8_t old; while (1) { old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 2); @@ -57,6 +58,11 @@ int32_t walInit() { atomic_store_8(&tsWal.inited, 1); } + if (stopDnode == NULL) { + wWarn("failed to set stop dnode call back"); + } + tsWal.stopDnode = stopDnode; + return 0; } @@ -164,6 +170,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { goto _err; } + pWal->stopDnode = tsWal.stopDnode; + wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod); return pWal; diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index dc3b2df52c..9979ddd0b0 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -525,6 +525,11 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { if (size != sizeof(SWalIdxEntry)) { wError("vgId:%d, failed to write idx entry due to %s. ver:%" PRId64, pWal->cfg.vgId, strerror(errno), ver); + if (pWal->stopDnode != NULL) { + wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId); + pWal->stopDnode(); + } + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } @@ -571,6 +576,11 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); + if (pWal->stopDnode != NULL) { + wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId); + pWal->stopDnode(); + } + TAOS_CHECK_GOTO(code, &lino, _exit); } @@ -627,6 +637,11 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy taosMemoryFreeClear(newBodyEncrypted); } + if (pWal->stopDnode != NULL) { + wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId); + pWal->stopDnode(); + } + TAOS_CHECK_GOTO(code, &lino, _exit); }