This commit is contained in:
dmchen 2024-07-29 12:06:41 +00:00
parent ef89905560
commit f4c8b6d081
1 changed files with 32 additions and 61 deletions

View File

@ -48,7 +48,7 @@
static inline int32_t mndAcquireRpc(SMnode *pMnode) { static inline int32_t mndAcquireRpc(SMnode *pMnode) {
int32_t code = 0; int32_t code = 0;
TAOS_CHECK_RETURN(taosThreadRwlockRdlock(&pMnode->lock)); (void)taosThreadRwlockRdlock(&pMnode->lock);
if (pMnode->stopped) { if (pMnode->stopped) {
code = TSDB_CODE_APP_IS_STOPPING; code = TSDB_CODE_APP_IS_STOPPING;
} else if (!mndIsLeader(pMnode)) { } else if (!mndIsLeader(pMnode)) {
@ -61,7 +61,7 @@ static inline int32_t mndAcquireRpc(SMnode *pMnode) {
mTrace("mnode rpc is acquired, ref:%d", ref); mTrace("mnode rpc is acquired, ref:%d", ref);
#endif #endif
} }
TAOS_CHECK_RETURN(taosThreadRwlockUnlock(&pMnode->lock)); (void)taosThreadRwlockUnlock(&pMnode->lock);
TAOS_RETURN(code); TAOS_RETURN(code);
} }
@ -91,62 +91,52 @@ static void *mndBuildTimerMsg(int32_t *pContLen) {
} }
static void mndPullupTrans(SMnode *pMnode) { static void mndPullupTrans(SMnode *pMnode) {
int32_t code = 0;
mTrace("pullup trans msg"); mTrace("pullup trans msg");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
if ((code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg)) != 0) { //TODO check return value
mError("failed to put to queue since %s", tstrerror(code)); (void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
} }
} }
static void mndPullupCompacts(SMnode *pMnode) { static void mndPullupCompacts(SMnode *pMnode) {
int32_t code = 0;
mTrace("pullup compact timer msg"); mTrace("pullup compact timer msg");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_COMPACT_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_COMPACT_TIMER, .pCont = pReq, .contLen = contLen};
if ((code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg)) != 0) { //TODO check return value
mError("failed to put to queue since %s", tstrerror(code)); (void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
} }
} }
static void mndPullupTtl(SMnode *pMnode) { static void mndPullupTtl(SMnode *pMnode) {
int32_t code = 0;
mTrace("pullup ttl"); mTrace("pullup ttl");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
if ((code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg)) != 0) { //TODO check return value
mError("failed to put to queue since %s", tstrerror(code)); (void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
} }
static void mndPullupTrimDb(SMnode *pMnode) { static void mndPullupTrimDb(SMnode *pMnode) {
int32_t code = 0;
mTrace("pullup s3migrate"); mTrace("pullup s3migrate");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_S3MIGRATE_DB_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_S3MIGRATE_DB_TIMER, .pCont = pReq, .contLen = contLen};
if ((code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg)) != 0) { // TODO check return value
mError("failed to put to queue since %s", tstrerror(code)); (void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
} }
static void mndPullupS3MigrateDb(SMnode *pMnode) { static void mndPullupS3MigrateDb(SMnode *pMnode) {
int32_t code = 0;
mTrace("pullup trim"); mTrace("pullup trim");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRIM_DB_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRIM_DB_TIMER, .pCont = pReq, .contLen = contLen};
if ((code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg)) != 0) { // TODO check return value
mError("failed to put to queue since %s", tstrerror(code)); (void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
} }
static int32_t mndPullupArbHeartbeat(SMnode *pMnode) { static int32_t mndPullupArbHeartbeat(SMnode *pMnode) {
@ -166,91 +156,76 @@ static int32_t mndPullupArbCheckSync(SMnode *pMnode) {
} }
static void mndCalMqRebalance(SMnode *pMnode) { static void mndCalMqRebalance(SMnode *pMnode) {
int32_t code = 0;
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen};
if ((code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg)) != 0) { (void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
mError("failed to put to queue since %s", tstrerror(code));
}
} }
} }
static void mndStreamCheckpointTimer(SMnode *pMnode) { static void mndStreamCheckpointTimer(SMnode *pMnode) {
int32_t code = 0;
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
if (pMsg != NULL) { if (pMsg != NULL) {
int32_t size = sizeof(SMStreamDoCheckpointMsg); int32_t size = sizeof(SMStreamDoCheckpointMsg);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
if ((code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg)) != 0) { // TODO check return value
mError("failed to put to queue since %s", tstrerror(code)); (void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
} }
} }
static void mndStreamCheckNode(SMnode *pMnode) { static void mndStreamCheckNode(SMnode *pMnode) {
int32_t code = 0;
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_NODECHECK_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_NODECHECK_TIMER, .pCont = pReq, .contLen = contLen};
if ((code = tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg)) != 0) { // TODO check return value
mError("failed to put to queue since %s", tstrerror(code)); (void)tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
} }
} }
static void mndStreamConsensusChkpt(SMnode *pMnode) { static void mndStreamConsensusChkpt(SMnode *pMnode) {
int32_t code = 0;
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CONSEN_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CONSEN_TIMER, .pCont = pReq, .contLen = contLen};
if ((code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg)) != 0) { // TODO check return value
mError("failed to put to queue since %s", tstrerror(code)); (void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
} }
} }
static void mndPullupTelem(SMnode *pMnode) { static void mndPullupTelem(SMnode *pMnode) {
int32_t code = 0;
mTrace("pullup telem msg"); mTrace("pullup telem msg");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
if ((code = tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg)) != 0) { // TODO check return value
mError("failed to put to queue since %s", tstrerror(code)); (void)tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
} }
} }
static void mndPullupGrant(SMnode *pMnode) { static void mndPullupGrant(SMnode *pMnode) {
int32_t code = 0;
mTrace("pullup grant msg"); mTrace("pullup grant msg");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527}; .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
if ((code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg)) != 0) { // TODO check return value
mError("failed to put to queue since %s", tstrerror(code)); (void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
} }
} }
static void mndIncreaseUpTime(SMnode *pMnode) { static void mndIncreaseUpTime(SMnode *pMnode) {
int32_t code = 0;
mTrace("increate uptime"); mTrace("increate uptime");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528}; .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
if ((code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg)) != 0) { // TODO check return value
mError("failed to put to queue since %s", tstrerror(code)); (void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
} }
} }
@ -465,14 +440,14 @@ static void *mndThreadFp(void *param) {
static int32_t mndInitTimer(SMnode *pMnode) { static int32_t mndInitTimer(SMnode *pMnode) {
int32_t code = 0; int32_t code = 0;
TdThreadAttr thAttr; TdThreadAttr thAttr;
TAOS_CHECK_RETURN(taosThreadAttrInit(&thAttr)); (void)taosThreadAttrInit(&thAttr);
TAOS_CHECK_RETURN(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE)); (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if ((code = taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode)) != 0) { if ((code = taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode)) != 0) {
mError("failed to create timer thread since %s", strerror(errno)); mError("failed to create timer thread since %s", strerror(errno));
TAOS_RETURN(code); TAOS_RETURN(code);
} }
TAOS_CHECK_RETURN(taosThreadAttrDestroy(&thAttr)); (void)taosThreadAttrDestroy(&thAttr);
tmsgReportStartup("mnode-timer", "initialized"); tmsgReportStartup("mnode-timer", "initialized");
TAOS_RETURN(code); TAOS_RETURN(code);
} }
@ -739,15 +714,11 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
} }
void mndPreClose(SMnode *pMnode) { void mndPreClose(SMnode *pMnode) {
int32_t code = 0;
if (pMnode != NULL) { if (pMnode != NULL) {
if ((code = syncLeaderTransfer(pMnode->syncMgmt.sync)) != 0) { // TODO check return value
mError("failed to pre close since %s", tstrerror(code)); syncLeaderTransfer(pMnode->syncMgmt.sync);
}
syncPreStop(pMnode->syncMgmt.sync); syncPreStop(pMnode->syncMgmt.sync);
if ((code = sdbWriteFile(pMnode->pSdb, 0)) != 0) { sdbWriteFile(pMnode->pSdb, 0);
mError("failed to pre close since %s", tstrerror(code));
}
} }
} }
@ -826,7 +797,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
} }
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
TAOS_CHECK_RETURN(taosThreadRwlockRdlock(&pMnode->lock)); (void)taosThreadRwlockRdlock(&pMnode->lock);
if (pMnode->stopped) { if (pMnode->stopped) {
(void)taosThreadRwlockUnlock(&pMnode->lock); (void)taosThreadRwlockUnlock(&pMnode->lock);
code = TSDB_CODE_APP_IS_STOPPING; code = TSDB_CODE_APP_IS_STOPPING;