From 5b2b4da11bf0c9cd5604a456b35ec1d83b7275a8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 4 May 2022 16:49:24 +0800 Subject: [PATCH 1/3] refactor: fix: reset confict trans types --- include/util/taoserror.h | 3 ++- source/dnode/mnode/impl/src/mndTrans.c | 31 ++++++++++++++------------ source/util/src/terror.c | 3 ++- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b8baf99552..abc752955d 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -264,7 +264,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0) #define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1) #define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2) -#define TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL TAOS_DEF_ERROR_CODE(0, 0x03D4) +#define TSDB_CODE_MND_TRANS_CONFLICT TAOS_DEF_ERROR_CODE(0, 0x03D3) +#define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03D4) // mnode-mq #define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index ce2a0efd2f..1c238d4082 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -710,12 +710,12 @@ static bool mndIsStbTrans(STrans *pTrans) { return pTrans->type > TRN_TYPE_STB_SCOPE && pTrans->type < TRN_TYPE_STB_SCOPE_END; } -static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) { +static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) { STrans *pTrans = NULL; void *pIter = NULL; - bool canParallel = true; + bool conflict = false; - if (mndIsBasicTrans(pNewTrans)) return canParallel; + if (mndIsBasicTrans(pNewTrans)) return conflict; while (1) { pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans); @@ -724,7 +724,7 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) { if (mndIsGlobalTrans(pNewTrans)) { if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); - canParallel = false; + conflict = true; } else { } } @@ -732,11 +732,11 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) { else if (mndIsDbTrans(pNewTrans)) { if (mndIsGlobalTrans(pTrans)) { mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); - canParallel = false; + conflict = true; } else if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { if (pNewTrans->dbUid == pTrans->dbUid) { mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); - canParallel = false; + conflict = true; } } else { } @@ -745,11 +745,11 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) { else if (mndIsStbTrans(pNewTrans)) { if (mndIsGlobalTrans(pTrans)) { mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); - canParallel = false; + conflict = true; } else if (mndIsDbTrans(pTrans)) { if (pNewTrans->dbUid == pTrans->dbUid) { mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); - canParallel = false; + conflict = true; } } else { } @@ -760,12 +760,12 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) { sdbCancelFetch(pMnode->pSdb, pIter); sdbRelease(pMnode->pSdb, pTrans); - return canParallel; + return conflict; } int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { - if (!mndCheckTransCanParallel(pMnode, pTrans)) { - terrno = TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL; + if (mndCheckTransConflict(pMnode, pTrans)) { + terrno = TSDB_CODE_MND_TRANS_CONFLICT; mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); return -1; } @@ -819,7 +819,8 @@ static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { } static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { - bool sendRsp = false; + bool sendRsp = false; + int32_t code = pTrans->code; if (pTrans->stage == TRN_STAGE_FINISHED) { sendRsp = true; @@ -829,12 +830,14 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { if (pTrans->stage == TRN_STAGE_UNDO_LOG || pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) { sendRsp = true; + if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR; } } if (pTrans->policy == TRN_POLICY_RETRY) { if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 0) { sendRsp = true; + if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR; } } @@ -845,13 +848,13 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { } taosMemoryFree(pTrans->rpcRsp); - mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage, + mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, code & 0xFFFF, pTrans->stage, pTrans->rpcAHandle); SRpcMsg rspMsg = { .handle = pTrans->rpcHandle, .ahandle = pTrans->rpcAHandle, .refId = pTrans->rpcRefId, - .code = pTrans->code, + .code = code, .pCont = rpcCont, .contLen = pTrans->rpcRspLen, }; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index bd787d50b0..00fe8bd0e9 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -271,7 +271,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL, "Conflicting transaction not completed") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT, "Conflict transaction not completed") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error") // mnode-mq TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists") From 71021334fb6a928f38a0e926010e6ce51c7388b4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 4 May 2022 20:37:37 +0800 Subject: [PATCH 2/3] refactor: adjust transaction code --- source/dnode/mnode/impl/src/mndTrans.c | 12 +++--- source/dnode/mnode/impl/src/mndUser.c | 3 -- source/dnode/mnode/impl/test/trans/trans2.cpp | 39 ++++++++++++++----- 3 files changed, 34 insertions(+), 20 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 1c238d4082..e80a16b16e 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -829,15 +829,13 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { if (pTrans->policy == TRN_POLICY_ROLLBACK) { if (pTrans->stage == TRN_STAGE_UNDO_LOG || pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) { - sendRsp = true; if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR; + sendRsp = true; } - } - - if (pTrans->policy == TRN_POLICY_RETRY) { + } else { if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 0) { - sendRsp = true; if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR; + sendRsp = true; } } @@ -1102,8 +1100,8 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { } else { pTrans->code = terrno; if (pTrans->policy == TRN_POLICY_ROLLBACK) { - pTrans->stage = TRN_STAGE_REDO_ACTION; - mError("trans:%d, stage from commit to redoAction since %s, failedTimes:%d", pTrans->id, terrstr(), + pTrans->stage = TRN_STAGE_UNDO_ACTION; + mError("trans:%d, stage from commit to undoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); continueExec = true; } else { diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 1800d24cf2..5e15bdeb43 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -275,9 +275,6 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate } sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); - char *param = strdup("====> test code to be deleted later <====="); - mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1); - if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); diff --git a/source/dnode/mnode/impl/test/trans/trans2.cpp b/source/dnode/mnode/impl/test/trans/trans2.cpp index 252cd105a0..aaf2c649e4 100644 --- a/source/dnode/mnode/impl/test/trans/trans2.cpp +++ b/source/dnode/mnode/impl/test/trans/trans2.cpp @@ -19,7 +19,7 @@ void reportStartup(SMgmtWrapper *pWrapper, const char *name, const char *desc) { class MndTestTrans2 : public ::testing::Test { protected: - static void SetUpTestSuite() { + static void InitLog() { dDebugFlag = 143; vDebugFlag = 0; mDebugFlag = 207; @@ -42,9 +42,9 @@ class MndTestTrans2 : public ::testing::Test { if (taosInitLog("taosdlog", 1) != 0) { printf("failed to init log file\n"); } + } - walInit(); - + static void InitMnode() { static SMsgCb msgCb = {0}; msgCb.reportStartupFp = reportStartup; msgCb.pWrapper = (SMgmtWrapper *)(&msgCb); // hack @@ -63,6 +63,12 @@ class MndTestTrans2 : public ::testing::Test { pMnode = mndOpen(mnodepath, &opt); } + static void SetUpTestSuite() { + InitLog(); + walInit(); + InitMnode(); + } + static void TearDownTestSuite() { mndClose(pMnode); walCleanUp(); @@ -76,11 +82,11 @@ class MndTestTrans2 : public ::testing::Test { void SetUp() override {} void TearDown() override {} - void CreateUser(const char *user) { + int32_t CreateUser(const char *acct, const char *user) { SUserObj userObj = {0}; taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass); tstrncpy(userObj.user, user, TSDB_USER_LEN); - tstrncpy(userObj.acct, "root", TSDB_USER_LEN); + tstrncpy(userObj.acct, acct, TSDB_USER_LEN); userObj.createdTime = taosGetTimestampMs(); userObj.updateTime = userObj.createdTime; userObj.superUser = 1; @@ -94,19 +100,32 @@ class MndTestTrans2 : public ::testing::Test { char *param = strdup("====> test param <====="); mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1); - mndTransPrepare(pMnode, pTrans); + int32_t code = mndTransPrepare(pMnode, pTrans); mndTransDrop(pTrans); + + return code; } }; SMnode *MndTestTrans2::pMnode; TEST_F(MndTestTrans2, 01_CbFunc) { + const char *acct = "root"; + const char *acct_invalid = "root1"; + const char *user1 = "test1"; + const char *user2 = "test2"; + SUserObj *pUser1 = NULL; + SUserObj *pUser2 = NULL; + ASSERT_NE(pMnode, nullptr); - const char *user1 = "test1"; - CreateUser(user1); - - SUserObj *pUser1 = mndAcquireUser(pMnode, user1); + // create user success + EXPECT_EQ(CreateUser(acct, user1), 0); + pUser1 = mndAcquireUser(pMnode, user1); ASSERT_NE(pUser1, nullptr); + + // failed to create user and rollback + EXPECT_EQ(CreateUser(acct_invalid, user2), 0); + pUser2 = mndAcquireUser(pMnode, user2); + ASSERT_EQ(pUser2, nullptr); } From 2f42c2e7933f2231c15f2236fe17b86d70c49fca Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 4 May 2022 22:00:04 +0800 Subject: [PATCH 3/3] refactor: adjust transaction code --- include/common/tglobal.h | 4 ++ source/common/src/tglobal.c | 4 ++ source/common/src/tmsgcb.c | 62 ++++++++++++++++--- source/dnode/mnode/impl/src/mndTrans.c | 6 +- source/dnode/mnode/impl/src/mnode.c | 11 ++-- source/dnode/mnode/impl/test/trans/trans2.cpp | 6 ++ 6 files changed, 75 insertions(+), 18 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index fc3d575317..f253d31963 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -121,6 +121,10 @@ extern char tsCompressor[]; extern int32_t tsDiskCfgNum; extern SDiskCfg tsDiskCfg[]; +// internal +extern int32_t tsTransPullupMs; +extern int32_t tsMaRebalanceMs; + #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, const char *envFile, diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index bbf14b2fdc..4470f5b0a3 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -169,6 +169,10 @@ uint32_t tsMaxRange = 500; // max range uint32_t tsCurRange = 100; // range char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR +// internal +int32_t tsTransPullupMs = 6000; +int32_t tsMaRebalanceMs = 2000; + void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) { tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN); tsDiskCfg[index].level = level; diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index c9cbb73884..42612cecb9 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -15,37 +15,83 @@ #define _DEFAULT_SOURCE #include "tmsgcb.h" +#include "taoserror.h" static SMsgCb tsDefaultMsgCb; void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; } int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { - return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq); + PutToQueueFp fp = pMsgCb->queueFps[qtype]; + if (fp != NULL) { + return (*fp)(pMsgCb->pWrapper, pReq); + } else { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } } int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { - return (*pMsgCb->qsizeFp)(pMsgCb->pWrapper, vgId, qtype); + GetQueueSizeFp fp = pMsgCb->qsizeFp; + if (fp != NULL) { + return (*fp)(pMsgCb->pWrapper, vgId, qtype); + } else { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } } int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) { - return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq); + SendReqFp fp = pMsgCb->sendReqFp; + if (fp != NULL) { + return (*fp)(pMsgCb->pWrapper, epSet, pReq); + } else { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } } -void tmsgSendRsp(const SRpcMsg* pRsp) { return (*tsDefaultMsgCb.sendRspFp)(tsDefaultMsgCb.pWrapper, pRsp); } +void tmsgSendRsp(const SRpcMsg* pRsp) { + SendRspFp fp = tsDefaultMsgCb.sendRspFp; + if (fp != NULL) { + return (*fp)(tsDefaultMsgCb.pWrapper, pRsp); + } else { + terrno = TSDB_CODE_INVALID_PTR; + } +} void tmsgSendRedirectRsp(const SRpcMsg* pRsp, const SEpSet* pNewEpSet) { - return (*tsDefaultMsgCb.sendRedirectRspFp)(tsDefaultMsgCb.pWrapper, pRsp, pNewEpSet); + SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp; + if (fp != NULL) { + (*fp)(tsDefaultMsgCb.pWrapper, pRsp, pNewEpSet); + } else { + terrno = TSDB_CODE_INVALID_PTR; + } } void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { - (*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg); + RegisterBrokenLinkArgFp fp = pMsgCb->registerBrokenLinkArgFp; + if (fp != NULL) { + (*fp)(pMsgCb->pWrapper, pMsg); + } else { + terrno = TSDB_CODE_INVALID_PTR; + } } void tmsgReleaseHandle(void* handle, int8_t type) { - (*tsDefaultMsgCb.releaseHandleFp)(tsDefaultMsgCb.pWrapper, handle, type); + ReleaseHandleFp fp = tsDefaultMsgCb.releaseHandleFp; + if (fp != NULL) { + (*fp)(tsDefaultMsgCb.pWrapper, handle, type); + } else { + terrno = TSDB_CODE_INVALID_PTR; + } } void tmsgReportStartup(const char* name, const char* desc) { - (*tsDefaultMsgCb.reportStartupFp)(tsDefaultMsgCb.pWrapper, name, desc); + ReportStartup fp = tsDefaultMsgCb.reportStartupFp; + if (fp != NULL && tsDefaultMsgCb.pWrapper != NULL) { + (*fp)(tsDefaultMsgCb.pWrapper, name, desc); + } else { + terrno = TSDB_CODE_INVALID_PTR; + } } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index e80a16b16e..6110d0be44 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1126,7 +1126,7 @@ static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) { } else { pTrans->code = terrno; pTrans->failedTimes++; - mError("trans:%d, stage keep on commitLog since %s", pTrans->id, terrstr()); + mError("trans:%d, stage keep on commitLog since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); continueExec = false; } @@ -1162,7 +1162,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { continueExec = false; } else { pTrans->failedTimes++; - mError("trans:%d, stage keep on undoAction since %s", pTrans->id, terrstr()); + mError("trans:%d, stage keep on undoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); continueExec = false; } @@ -1179,7 +1179,7 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { continueExec = true; } else { pTrans->failedTimes++; - mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr()); + mError("trans:%d, stage keep on rollback since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); continueExec = false; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index b38fd6178c..e2814e95f0 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -43,9 +43,6 @@ #include "mndUser.h" #include "mndVgroup.h" -#define MQ_TIMER_MS 2000 -#define TRNAS_TIMER_MS 6000 - static void *mndBuildTimerMsg(int32_t *pContLen) { SMTimerReq timerReq = {0}; @@ -68,7 +65,7 @@ static void mndPullupTrans(void *param, void *tmrId) { tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } - taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer); + taosTmrReset(mndPullupTrans, tsTransPullupMs, pMnode, pMnode->timer, &pMnode->transTimer); } static void mndCalMqRebalance(void *param, void *tmrId) { @@ -84,7 +81,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } - taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer); + taosTmrReset(mndCalMqRebalance, tsMaRebalanceMs, pMnode, pMnode->timer, &pMnode->mqTimer); } static void mndPullupTelem(void *param, void *tmrId) { @@ -106,12 +103,12 @@ static int32_t mndInitTimer(SMnode *pMnode) { return -1; } - if (taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer)) { + if (taosTmrReset(mndPullupTrans, tsTransPullupMs, pMnode, pMnode->timer, &pMnode->transTimer)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer)) { + if (taosTmrReset(mndCalMqRebalance, tsMaRebalanceMs, pMnode, pMnode->timer, &pMnode->mqTimer)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } diff --git a/source/dnode/mnode/impl/test/trans/trans2.cpp b/source/dnode/mnode/impl/test/trans/trans2.cpp index aaf2c649e4..1fabf677c4 100644 --- a/source/dnode/mnode/impl/test/trans/trans2.cpp +++ b/source/dnode/mnode/impl/test/trans/trans2.cpp @@ -58,9 +58,12 @@ class MndTestTrans2 : public ::testing::Test { strcpy(opt.replicas[0].fqdn, "localhost"); opt.msgCb = msgCb; + tsTransPullupMs = 1000; + const char *mnodepath = "/tmp/mnode_test_trans"; taosRemoveDir(mnodepath); pMnode = mndOpen(mnodepath, &opt); + mndStart(pMnode); } static void SetUpTestSuite() { @@ -70,6 +73,7 @@ class MndTestTrans2 : public ::testing::Test { } static void TearDownTestSuite() { + mndStop(pMnode); mndClose(pMnode); walCleanUp(); taosCloseLog(); @@ -128,4 +132,6 @@ TEST_F(MndTestTrans2, 01_CbFunc) { EXPECT_EQ(CreateUser(acct_invalid, user2), 0); pUser2 = mndAcquireUser(pMnode, user2); ASSERT_EQ(pUser2, nullptr); + + mndTransPullup(pMnode); }