Merge pull request #26858 from taosdata/fix/TD-30989-scan1-16

fix/TD-30989-scan1-16
This commit is contained in:
Hongze Cheng 2024-07-30 09:52:50 +08:00 committed by GitHub
commit 27f8737aa1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 65 additions and 53 deletions

View File

@ -48,32 +48,32 @@
static inline int32_t mndAcquireRpc(SMnode *pMnode) {
int32_t code = 0;
taosThreadRwlockRdlock(&pMnode->lock);
(void)taosThreadRwlockRdlock(&pMnode->lock);
if (pMnode->stopped) {
code = TSDB_CODE_APP_IS_STOPPING;
} else if (!mndIsLeader(pMnode)) {
code = -1;
} else {
#if 1
atomic_add_fetch_32(&pMnode->rpcRef, 1);
(void)atomic_add_fetch_32(&pMnode->rpcRef, 1);
#else
int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
mTrace("mnode rpc is acquired, ref:%d", ref);
#endif
}
taosThreadRwlockUnlock(&pMnode->lock);
(void)taosThreadRwlockUnlock(&pMnode->lock);
TAOS_RETURN(code);
}
static inline void mndReleaseRpc(SMnode *pMnode) {
taosThreadRwlockRdlock(&pMnode->lock);
(void)taosThreadRwlockRdlock(&pMnode->lock);
#if 1
atomic_sub_fetch_32(&pMnode->rpcRef, 1);
(void)atomic_sub_fetch_32(&pMnode->rpcRef, 1);
#else
int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
mTrace("mnode rpc is released, ref:%d", ref);
#endif
taosThreadRwlockUnlock(&pMnode->lock);
(void)taosThreadRwlockUnlock(&pMnode->lock);
}
static void *mndBuildTimerMsg(int32_t *pContLen) {
@ -85,7 +85,7 @@ static void *mndBuildTimerMsg(int32_t *pContLen) {
void *pReq = rpcMallocCont(contLen);
if (pReq == NULL) return NULL;
tSerializeSMTimerMsg(pReq, contLen, &timerReq);
(void)tSerializeSMTimerMsg(pReq, contLen, &timerReq);
*pContLen = contLen;
return pReq;
}
@ -96,7 +96,8 @@ static void mndPullupTrans(SMnode *pMnode) {
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
//TODO check return value
(void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
}
@ -106,7 +107,8 @@ static void mndPullupCompacts(SMnode *pMnode) {
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_COMPACT_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
//TODO check return value
(void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
}
@ -115,7 +117,8 @@ static void mndPullupTtl(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
//TODO check return value
(void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
static void mndPullupTrimDb(SMnode *pMnode) {
@ -123,7 +126,8 @@ static void mndPullupTrimDb(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_S3MIGRATE_DB_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
// TODO check return value
(void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
static void mndPullupS3MigrateDb(SMnode *pMnode) {
@ -131,7 +135,8 @@ static void mndPullupS3MigrateDb(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRIM_DB_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
// TODO check return value
(void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
static int32_t mndPullupArbHeartbeat(SMnode *pMnode) {
@ -155,7 +160,7 @@ static void mndCalMqRebalance(SMnode *pMnode) {
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
(void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
}
@ -164,7 +169,8 @@ static void mndStreamCheckpointTimer(SMnode *pMnode) {
if (pMsg != NULL) {
int32_t size = sizeof(SMStreamDoCheckpointMsg);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
// TODO check return value
(void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
}
@ -173,7 +179,8 @@ static void mndStreamCheckNode(SMnode *pMnode) {
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_NODECHECK_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
// TODO check return value
(void)tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
}
@ -182,7 +189,8 @@ static void mndStreamConsensusChkpt(SMnode *pMnode) {
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CONSEN_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
// TODO check return value
(void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
}
@ -192,7 +200,8 @@ static void mndPullupTelem(SMnode *pMnode) {
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
// TODO check return value
(void)tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
}
@ -203,7 +212,8 @@ static void mndPullupGrant(SMnode *pMnode) {
if (pReq != NULL) {
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
// TODO check return value
(void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
}
@ -214,7 +224,8 @@ static void mndIncreaseUpTime(SMnode *pMnode) {
if (pReq != NULL) {
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
// TODO check return value
(void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
}
@ -287,24 +298,24 @@ static void mndCheckDnodeOffline(SMnode *pMnode) {
static bool mnodeIsNotLeader(SMnode *pMnode) {
terrno = 0;
taosThreadRwlockRdlock(&pMnode->lock);
(void)taosThreadRwlockRdlock(&pMnode->lock);
SSyncState state = syncGetState(pMnode->syncMgmt.sync);
if (terrno != 0) {
taosThreadRwlockUnlock(&pMnode->lock);
(void)taosThreadRwlockUnlock(&pMnode->lock);
return true;
}
if (state.state != TAOS_SYNC_STATE_LEADER) {
taosThreadRwlockUnlock(&pMnode->lock);
(void)taosThreadRwlockUnlock(&pMnode->lock);
terrno = TSDB_CODE_SYN_NOT_LEADER;
return true;
}
if (!state.restored || !pMnode->restored) {
taosThreadRwlockUnlock(&pMnode->lock);
(void)taosThreadRwlockUnlock(&pMnode->lock);
terrno = TSDB_CODE_SYN_RESTORING;
return true;
}
taosThreadRwlockUnlock(&pMnode->lock);
(void)taosThreadRwlockUnlock(&pMnode->lock);
return false;
}
@ -429,21 +440,21 @@ static void *mndThreadFp(void *param) {
static int32_t mndInitTimer(SMnode *pMnode) {
int32_t code = 0;
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
(void)taosThreadAttrInit(&thAttr);
(void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if ((code = taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode)) != 0) {
mError("failed to create timer thread since %s", strerror(errno));
TAOS_RETURN(code);
}
taosThreadAttrDestroy(&thAttr);
(void)taosThreadAttrDestroy(&thAttr);
tmsgReportStartup("mnode-timer", "initialized");
TAOS_RETURN(code);
}
static void mndCleanupTimer(SMnode *pMnode) {
if (taosCheckPthreadValid(pMnode->thread)) {
taosThreadJoin(pMnode->thread, NULL);
(void)taosThreadJoin(pMnode->thread, NULL);
taosThreadClear(&pMnode->thread);
}
}
@ -467,7 +478,7 @@ static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
static int32_t mndInitWal(SMnode *pMnode) {
int32_t code = 0;
char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
(void)snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
SWalCfg cfg = {.vgId = 1,
.fsyncPeriod = 0,
.rollPeriod = -1,
@ -486,7 +497,7 @@ static int32_t mndInitWal(SMnode *pMnode) {
TAOS_RETURN(code);
}
else{
strncpy(cfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
(void)strncpy(cfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
}
}
#endif
@ -642,8 +653,8 @@ static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->syncMgmt.numOfReplicas = pOption->numOfReplicas;
pMnode->syncMgmt.numOfTotalReplicas = pOption->numOfTotalReplicas;
pMnode->syncMgmt.lastIndex = pOption->lastIndex;
memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas));
memcpy(pMnode->syncMgmt.nodeRoles, pOption->nodeRoles, sizeof(pOption->nodeRoles));
(void)memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas));
(void)memcpy(pMnode->syncMgmt.nodeRoles, pOption->nodeRoles, sizeof(pOption->nodeRoles));
}
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
@ -656,7 +667,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
mError("failed to open mnode since %s", terrstr());
return NULL;
}
memset(pMnode, 0, sizeof(SMnode));
(void)memset(pMnode, 0, sizeof(SMnode));
char timestr[24] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
@ -704,6 +715,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
void mndPreClose(SMnode *pMnode) {
if (pMnode != NULL) {
// TODO check return value
syncLeaderTransfer(pMnode->syncMgmt.sync);
syncPreStop(pMnode->syncMgmt.sync);
sdbWriteFile(pMnode->pSdb, 0);
@ -785,9 +797,9 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
}
SMnode *pMnode = pMsg->info.node;
taosThreadRwlockRdlock(&pMnode->lock);
(void)taosThreadRwlockRdlock(&pMnode->lock);
if (pMnode->stopped) {
taosThreadRwlockUnlock(&pMnode->lock);
(void)taosThreadRwlockUnlock(&pMnode->lock);
code = TSDB_CODE_APP_IS_STOPPING;
TAOS_RETURN(code);
}
@ -795,31 +807,31 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
terrno = 0;
SSyncState state = syncGetState(pMnode->syncMgmt.sync);
if (terrno != 0) {
taosThreadRwlockUnlock(&pMnode->lock);
(void)taosThreadRwlockUnlock(&pMnode->lock);
code = terrno;
TAOS_RETURN(code);
}
if (state.state != TAOS_SYNC_STATE_LEADER) {
taosThreadRwlockUnlock(&pMnode->lock);
(void)taosThreadRwlockUnlock(&pMnode->lock);
code = TSDB_CODE_SYN_NOT_LEADER;
goto _OVER;
}
if (!state.restored || !pMnode->restored) {
taosThreadRwlockUnlock(&pMnode->lock);
(void)taosThreadRwlockUnlock(&pMnode->lock);
code = TSDB_CODE_SYN_RESTORING;
goto _OVER;
}
#if 1
atomic_add_fetch_32(&pMnode->rpcRef, 1);
(void)atomic_add_fetch_32(&pMnode->rpcRef, 1);
#else
int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
mTrace("mnode rpc is acquired, ref:%d", ref);
#endif
taosThreadRwlockUnlock(&pMnode->lock);
(void)taosThreadRwlockUnlock(&pMnode->lock);
TAOS_RETURN(code);
_OVER:
@ -854,7 +866,7 @@ _OVER:
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
pMsg->info.rsp = rpcMallocCont(contLen);
if (pMsg->info.rsp != NULL) {
tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
(void)tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
pMsg->info.hasEpSet = 1;
pMsg->info.rspLen = contLen;
}
@ -973,7 +985,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
} else {
tstrncpy(desc.status, "offline", sizeof(desc.status));
}
taosArrayPush(pClusterInfo->dnodes, &desc);
(void)taosArrayPush(pClusterInfo->dnodes, &desc);
sdbRelease(pSdb, pObj);
}
@ -999,7 +1011,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
tstrncpy(desc.role, syncStr(pObj->syncState), sizeof(desc.role));
desc.syncState = pObj->syncState;
}
taosArrayPush(pClusterInfo->mnodes, &desc);
(void)taosArrayPush(pClusterInfo->mnodes, &desc);
sdbRelease(pSdb, pObj);
}
@ -1017,8 +1029,8 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
desc.vgroup_id = pVgroup->vgId;
SName name = {0};
tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
tNameGetDbName(&name, desc.database_name);
(void)tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
(void)tNameGetDbName(&name, desc.database_name);
desc.tables_num = pVgroup->numOfTables;
pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
@ -1039,7 +1051,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
pClusterInfo->vnodes_total++;
}
taosArrayPush(pVgroupInfo->vgroups, &desc);
(void)taosArrayPush(pVgroupInfo->vgroups, &desc);
sdbRelease(pSdb, pVgroup);
}
@ -1053,14 +1065,14 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
SMonStbDesc desc = {0};
SName name1 = {0};
tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
tNameGetDbName(&name1, desc.database_name);
(void)tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
(void)tNameGetDbName(&name1, desc.database_name);
SName name2 = {0};
tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
(void)tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);
taosArrayPush(pStbInfo->stbs, &desc);
(void)taosArrayPush(pStbInfo->stbs, &desc);
sdbRelease(pSdb, pStb);
}
@ -1096,12 +1108,12 @@ void mndSetRestored(SMnode *pMnode, bool restored) {
if (restored) {
taosThreadRwlockWrlock(&pMnode->lock);
pMnode->restored = true;
taosThreadRwlockUnlock(&pMnode->lock);
(void)taosThreadRwlockUnlock(&pMnode->lock);
mInfo("mnode set restored:%d", restored);
} else {
taosThreadRwlockWrlock(&pMnode->lock);
pMnode->restored = false;
taosThreadRwlockUnlock(&pMnode->lock);
(void)taosThreadRwlockUnlock(&pMnode->lock);
mInfo("mnode set restored:%d", restored);
while (1) {
if (pMnode->rpcRef <= 0) break;
@ -1115,7 +1127,7 @@ bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }
void mndSetStop(SMnode *pMnode) {
taosThreadRwlockWrlock(&pMnode->lock);
pMnode->stopped = true;
taosThreadRwlockUnlock(&pMnode->lock);
(void)taosThreadRwlockUnlock(&pMnode->lock);
mInfo("mnode set stopped");
}