From 80e0b830fe09470d2bd75bc24809723fbabfb0db Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 17 Jun 2022 10:40:42 +0800 Subject: [PATCH 1/4] refactor: remove duplicate conninfo in SRpcMsg --- include/libs/transport/trpc.h | 5 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 25 +------ source/dnode/mnode/impl/src/mndAuth.c | 2 +- source/dnode/mnode/impl/src/mndBnode.c | 4 +- source/dnode/mnode/impl/src/mndDb.c | 12 ++-- source/dnode/mnode/impl/src/mndDnode.c | 4 +- source/dnode/mnode/impl/src/mndFunc.c | 4 +- source/dnode/mnode/impl/src/mndMnode.c | 4 +- source/dnode/mnode/impl/src/mndProfile.c | 67 ++++++++++--------- source/dnode/mnode/impl/src/mndQnode.c | 4 +- source/dnode/mnode/impl/src/mndShow.c | 2 +- source/dnode/mnode/impl/src/mndSma.c | 4 +- source/dnode/mnode/impl/src/mndSnode.c | 4 +- source/dnode/mnode/impl/src/mndStb.c | 6 +- source/dnode/mnode/impl/src/mndStream.c | 8 +-- source/dnode/mnode/impl/src/mndTopic.c | 2 +- source/dnode/mnode/impl/src/mndTrans.c | 2 +- source/dnode/mnode/impl/src/mndUser.c | 8 +-- source/dnode/mnode/impl/src/mndVgroup.c | 6 +- source/dnode/vnode/src/vnd/vnodeSync.c | 6 +- source/libs/transport/src/transSvr.c | 2 +- 21 files changed, 82 insertions(+), 99 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index fd57eef83a..e3db1550a0 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -46,7 +46,6 @@ typedef struct SRpcHandleInfo { int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp); int32_t persistHandle; // persist handle or not - SRpcConnInfo connInfo; // app info void *ahandle; // app handle set by client void *wrapper; // wrapper handle @@ -55,6 +54,9 @@ typedef struct SRpcHandleInfo { // resp info void * rsp; int32_t rspLen; + + // conn info + SRpcConnInfo conn; } SRpcHandleInfo; typedef struct SRpcMsg { @@ -63,7 +65,6 @@ typedef struct SRpcMsg { int32_t contLen; int32_t code; SRpcHandleInfo info; - SRpcConnInfo conn; } SRpcMsg; typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index b66e559370..13f2452e66 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -21,21 +21,6 @@ static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet); static void dmSendRsp(SRpcMsg *pMsg); static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg); -static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) { - SRpcConnInfo *pConnInfo = &(pRpc->info.connInfo); - // if (IsReq(pRpc)) { - // terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; - // dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->info.ahandle, pRpc->info.handle); - // return -1; - //} - - memcpy(pMsg, pRpc, sizeof(SRpcMsg)); - memcpy(pMsg->conn.user, pConnInfo->user, TSDB_USER_LEN); - pMsg->conn.clientIp = pConnInfo->clientIp; - pMsg->conn.clientPort = pConnInfo->clientPort; - return 0; -} - int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)]; if (msgFp == NULL) { @@ -116,14 +101,10 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { } pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); - if (pMsg == NULL) { - goto _OVER; - } - dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); + if (pMsg == NULL) goto _OVER; + memcpy(pMsg, pRpc, sizeof(SRpcMsg)); - if (dmBuildNodeMsg(pMsg, pRpc) != 0) { - goto _OVER; - } + dTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle); if (InParentProc(pWrapper)) { code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ); diff --git a/source/dnode/mnode/impl/src/mndAuth.c b/source/dnode/mnode/impl/src/mndAuth.c index f036fc48f7..d47fb9dfb4 100644 --- a/source/dnode/mnode/impl/src/mndAuth.c +++ b/source/dnode/mnode/impl/src/mndAuth.c @@ -56,7 +56,7 @@ static int32_t mndProcessAuthReq(SRpcMsg *pReq) { memcpy(authRsp.user, authReq.user, TSDB_USER_LEN); int32_t code = mndRetriveAuth(pReq->info.node, &authRsp); - mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->conn.user, authRsp.spi, authRsp.encrypt, + mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->info.conn.user, authRsp.spi, authRsp.encrypt, authRsp.user); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authRsp); diff --git a/source/dnode/mnode/impl/src/mndBnode.c b/source/dnode/mnode/impl/src/mndBnode.c index aa908b983d..0de40ca671 100644 --- a/source/dnode/mnode/impl/src/mndBnode.c +++ b/source/dnode/mnode/impl/src/mndBnode.c @@ -292,7 +292,7 @@ static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_BNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_BNODE) != 0) { goto _OVER; } @@ -394,7 +394,7 @@ static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_BNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_BNODE) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index c12c6d5b4c..2eeff9cb33 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -521,12 +521,12 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) { goto _OVER; } - pUser = mndAcquireUser(pMnode, pReq->conn.user); + pUser = mndAcquireUser(pMnode, pReq->info.conn.user); if (pUser == NULL) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_DB, NULL) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DB, NULL) != 0) { goto _OVER; } @@ -700,7 +700,7 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_ALTER_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_ALTER_DB, pDb) != 0) { goto _OVER; } @@ -980,7 +980,7 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) { } } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_DROP_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_DB, pDb) != 0) { goto _OVER; } @@ -1127,7 +1127,7 @@ static int32_t mndProcessUseDbReq(SRpcMsg *pReq) { mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr()); } else { - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_USE_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_USE_DB, pDb) != 0) { goto _OVER; } @@ -1252,7 +1252,7 @@ static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_COMPACT_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB, pDb) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 6d37143fc9..d1bed90175 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -522,7 +522,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_DNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE) != 0) { goto _OVER; } @@ -623,7 +623,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) { } } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_MNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index dfdc0a3c1a..832f1b8e68 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -318,7 +318,7 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_FUNC) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_FUNC) != 0) { goto _OVER; } @@ -365,7 +365,7 @@ static int32_t mndProcessDropFuncReq(SRpcMsg *pReq) { } } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_FUNC) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_FUNC) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index a636f34f3d..b40cd713e5 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -414,7 +414,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_MNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE) != 0) { goto _OVER; } @@ -621,7 +621,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_MNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 20d94e341c..39d21aad49 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -133,7 +133,7 @@ static void mndFreeConn(SConnObj *pConn) { taosWLockLatch(&pConn->queryLock); taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc); taosWUnLockLatch(&pConn->queryLock); - + mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn); } @@ -194,15 +194,15 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { goto CONN_OVER; } - taosIp2String(pReq->conn.clientIp, ip); + taosIp2String(pReq->info.conn.clientIp, ip); - pUser = mndAcquireUser(pMnode, pReq->conn.user); + pUser = mndAcquireUser(pMnode, pReq->info.conn.user); if (pUser == NULL) { - mError("user:%s, failed to login while acquire user since %s", pReq->conn.user, terrstr()); + mError("user:%s, failed to login while acquire user since %s", pReq->info.conn.user, terrstr()); goto CONN_OVER; } if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) { - mError("user:%s, failed to auth while acquire user, input:%s", pReq->conn.user, connReq.passwd); + mError("user:%s, failed to auth while acquire user, input:%s", pReq->info.conn.user, connReq.passwd); code = TSDB_CODE_RPC_AUTH_FAILURE; goto CONN_OVER; } @@ -213,15 +213,16 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { pDb = mndAcquireDb(pMnode, db); if (pDb == NULL) { terrno = TSDB_CODE_MND_INVALID_DB; - mError("user:%s, failed to login from %s while use db:%s since %s", pReq->conn.user, ip, connReq.db, terrstr()); + mError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db, + terrstr()); goto CONN_OVER; } } - pConn = mndCreateConn(pMnode, pReq->conn.user, connReq.connType, pReq->conn.clientIp, pReq->conn.clientPort, - connReq.pid, connReq.app, connReq.startTime); + pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp, + pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime); if (pConn == NULL) { - mError("user:%s, failed to login from %s while create connection since %s", pReq->conn.user, ip, terrstr()); + mError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr()); goto CONN_OVER; } @@ -246,7 +247,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { pReq->info.rspLen = contLen; pReq->info.rsp = pRsp; - mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->conn.user, ip, pConn->port, pConn->id, connReq.app); + mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app); code = 0; @@ -348,7 +349,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb if (pHbReq->query) { SQueryHbReqBasic *pBasic = pHbReq->query; - SRpcConnInfo connInfo = pMsg->conn; + SRpcConnInfo connInfo = pMsg->info.conn; SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId); if (pConn == NULL) { @@ -361,7 +362,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id); } } - + SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic)); if (rspBasic == NULL) { mndReleaseConn(pMnode, pConn); @@ -386,7 +387,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb mndGetMnodeEpSet(pMnode, &rspBasic->epSet); mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1); - + mndReleaseConn(pMnode, pConn); hbRsp.query = rspBasic; @@ -500,7 +501,7 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SProfileMgmt *pMgmt = &pMnode->profileMgmt; - SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user); + SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user); if (pUser == NULL) return 0; if (!pUser->superUser) { mndReleaseUser(pMnode, pUser); @@ -523,7 +524,7 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) { terrno = TSDB_CODE_MND_INVALID_CONN_ID; return -1; } else { - mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->conn.user); + mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->info.conn.user); pConn->killId = killReq.queryId; taosCacheRelease(pMgmt->cache, (void **)&pConn, false); return 0; @@ -534,7 +535,7 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SProfileMgmt *pMgmt = &pMnode->profileMgmt; - SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user); + SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user); if (pUser == NULL) return 0; if (!pUser->superUser) { mndReleaseUser(pMnode, pUser); @@ -555,7 +556,7 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) { terrno = TSDB_CODE_MND_INVALID_CONN_ID; return -1; } else { - mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->conn.user); + mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->info.conn.user); pConn->killed = 1; taosCacheRelease(pMgmt->cache, (void **)&pConn, false); return TSDB_CODE_SUCCESS; @@ -563,12 +564,12 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) { } static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; - int32_t numOfRows = 0; - int32_t cols = 0; - SConnObj *pConn = NULL; - + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + int32_t cols = 0; + SConnObj *pConn = NULL; + if (pShow->pIter == NULL) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; pShow->pIter = taosCacheCreateIter(pMgmt->cache); @@ -619,12 +620,12 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl } static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; - int32_t numOfRows = 0; - int32_t cols = 0; - SConnObj *pConn = NULL; - + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + int32_t cols = 0; + SConnObj *pConn = NULL; + if (pShow->pIter == NULL) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; pShow->pIter = taosCacheCreateIter(pMgmt->cache); @@ -645,7 +646,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p int32_t numOfQueries = taosArrayGetSize(pConn->pQueries); for (int32_t i = 0; i < numOfQueries; ++i) { - SQueryDesc* pQuery = taosArrayGet(pConn->pQueries, i); + SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i); cols = 0; char queryId[26 + VARSTR_HEADER_SIZE] = {0}; @@ -691,14 +692,14 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false); - char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0}; + char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0}; int32_t strSize = sizeof(subStatus); int32_t offset = VARSTR_HEADER_SIZE; for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) { if (i) { offset += snprintf(subStatus + offset, strSize - offset - 1, ","); } - SQuerySubDesc* pDesc = taosArrayGet(pQuery->subDesc, i); + SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i); offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status); } varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]); @@ -712,7 +713,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p numOfRows++; } - + taosRUnLockLatch(&pConn->queryLock); } diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index 595287a3af..f5625f32d5 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -294,7 +294,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_QNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_QNODE) != 0) { goto _OVER; } @@ -396,7 +396,7 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_QNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_QNODE) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index d312955202..a6447ed405 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -229,7 +229,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type); - // if (mndCheckShowAuth(pMnode, pReq->conn.user, pShow->type) != 0) return -1; + // if (mndCheckShowAuth(pMnode, pReq->info.conn.user, pShow->type) != 0) return -1; int32_t numOfCols = pShow->pMeta->numOfColumns; SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 3eb5b03efd..023a28ce35 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -710,7 +710,7 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) { goto _OVER; } @@ -971,7 +971,7 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index c84dc2f3dd..0a99f356b1 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -300,7 +300,7 @@ static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_SNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_SNODE) != 0) { goto _OVER; } @@ -404,7 +404,7 @@ static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_SNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_SNODE) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index b83aa34a3a..b8b22cee85 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -805,7 +805,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) { goto _OVER; } @@ -1454,7 +1454,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) { goto _OVER; } @@ -1584,7 +1584,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3a42c694c8..8e82946d68 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -545,7 +545,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq return -1; } - if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) { + if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) { mError("trans:%d, failed to create stb for stream since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; @@ -602,7 +602,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) { goto _OVER; } #endif @@ -627,7 +627,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } // create stb for stream - if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) { + if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) { mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr()); mndTransDrop(pTrans); goto _OVER; @@ -696,7 +696,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { #if 0 // todo check auth - pUser = mndAcquireUser(pMnode, pReq->conn.user); + pUser = mndAcquireUser(pMnode, pReq->info.conn.user); if (pUser == NULL) { goto DROP_STREAM_OVER; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 4c2730ce94..8afb7ab354 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -474,7 +474,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index dafb2fa1d1..b3a2888535 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1369,7 +1369,7 @@ static int32_t mndProcessKillTransReq(SRpcMsg *pReq) { mInfo("trans:%d, start to kill", killReq.transId); - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_KILL_TRANS) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_KILL_TRANS) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 9590823106..eb0a818a60 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -354,13 +354,13 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) { goto _OVER; } - pOperUser = mndAcquireUser(pMnode, pReq->conn.user); + pOperUser = mndAcquireUser(pMnode, pReq->info.conn.user); if (pOperUser == NULL) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_USER) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_USER) != 0) { goto _OVER; } @@ -460,7 +460,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { goto _OVER; } - pOperUser = mndAcquireUser(pMnode, pReq->conn.user); + pOperUser = mndAcquireUser(pMnode, pReq->info.conn.user); if (pOperUser == NULL) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; goto _OVER; @@ -643,7 +643,7 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_USER) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_USER) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 755f4ef0b2..cec83d1af5 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1200,7 +1200,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) { mInfo("vgId:%d, start to redistribute to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3); - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER; + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER; pVgroup = mndAcquireVgroup(pMnode, req.vgId); if (pVgroup == NULL) goto _OVER; @@ -1500,7 +1500,7 @@ static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { pDb = mndAcquireDb(pMnode, pVgroup->dbName); if (pDb == NULL) goto _OVER; - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_SPLIT_VGROUP) != 0) goto _OVER; + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_SPLIT_VGROUP) != 0) goto _OVER; code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup); if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; @@ -1624,7 +1624,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) { mDebug("start to balance vgroup"); - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_BALANCE_VGROUP) != 0) goto _OVER; + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_BALANCE_VGROUP) != 0) goto _OVER; while (1) { SDnodeObj *pDnode = NULL; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 8662c8369b..6a1f5c606d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -184,7 +184,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; if (rsp.code == 0) { - if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->conn.applyIndex, &rsp) < 0) { + if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) { rsp.code = terrno; vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr()); } @@ -329,7 +329,7 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) { SVnode *pVnode = pFsm->data; - SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index}; + SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .info.conn.applyIndex = cbMeta.index}; syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); vInfo("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode), @@ -359,7 +359,7 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); - SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index}; + SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .info.conn.applyIndex = cbMeta.index}; rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index b6e3fd2676..bd3781f9c7 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -310,7 +310,7 @@ static void uvHandleReq(SSvrConn* pConn) { } // set up conn info - SRpcConnInfo* pConnInfo = &(transMsg.info.connInfo); + SRpcConnInfo* pConnInfo = &(transMsg.info.conn); pConnInfo->clientIp = (uint32_t)(pConn->addr.sin_addr.s_addr); pConnInfo->clientPort = ntohs(pConn->addr.sin_port); tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user)); From 62f21b528eb844e5f06cd82e09ad098b720e1434 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 17 Jun 2022 10:52:45 +0800 Subject: [PATCH 2/4] refactor: remove duplicate conninfo in SRpcMsg --- source/dnode/vnode/src/vnd/vnodeSync.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 6a1f5c606d..f46114746e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -329,8 +329,9 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) { SVnode *pVnode = pFsm->data; - SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .info.conn.applyIndex = cbMeta.index}; + SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); + rpcMsg.info.conn.applyIndex = cbMeta.index; vInfo("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle); @@ -359,10 +360,11 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); - SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .info.conn.applyIndex = cbMeta.index}; + SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); + rpcMsg.info.conn.applyIndex = cbMeta.index; tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); } else { From f5db109e61c44f92e1d2e75edceac99057be579a Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 17 Jun 2022 11:37:10 +0800 Subject: [PATCH 3/4] fix(sync): get last max snapshot index --- include/libs/sync/sync.h | 1 + source/dnode/mnode/impl/src/mndSync.c | 8 +++-- source/dnode/mnode/impl/src/mndTrans.c | 8 +++-- source/libs/sync/inc/syncRaftCfg.h | 6 ++++ source/libs/sync/src/syncMain.c | 32 +++++++++++++++--- source/libs/sync/src/syncRaftCfg.c | 41 ++++++++++++++++++----- source/libs/sync/src/syncRaftLog.c | 26 +++++++------- source/libs/sync/src/syncSnapshot.c | 10 +++--- source/libs/sync/test/syncRaftCfgTest.cpp | 19 ++++++++++- 9 files changed, 114 insertions(+), 37 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index fc9d419b1f..f49030466e 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -192,6 +192,7 @@ bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); +int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 3a243e36d3..388a9d82c4 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -69,7 +69,8 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM if (cbMeta.index - sdbGetApplyIndex(pMnode->pSdb) > 100) { SSnapshotMeta sMeta = {0}; - if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { + // if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { + if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, cbMeta.index, &sMeta) == 0) { sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex); } sdbWriteFile(pMnode->pSdb); @@ -89,7 +90,10 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { SMnode *pMnode = pFsm->data; SSnapshotMeta sMeta = {0}; - if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { + // if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { + + SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb); + if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) { sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex); } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 1631c9825b..a50d8da37e 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -22,8 +22,8 @@ #include "mndSync.h" #include "mndUser.h" -#define TRANS_VER_NUMBER 1 -#define TRANS_ARRAY_SIZE 8 +#define TRANS_VER_NUMBER 1 +#define TRANS_ARRAY_SIZE 8 #define TRANS_RESERVE_SIZE 64 static SSdbRaw *mndTransActionEncode(STrans *pTrans); @@ -1426,7 +1426,9 @@ void mndTransPullup(SMnode *pMnode) { } SSnapshotMeta sMeta = {0}; - if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { + // if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { + SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb); + if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) { sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex); } sdbWriteFile(pMnode->pSdb); diff --git a/source/libs/sync/inc/syncRaftCfg.h b/source/libs/sync/inc/syncRaftCfg.h index e72e1e7be7..cd64402738 100644 --- a/source/libs/sync/inc/syncRaftCfg.h +++ b/source/libs/sync/inc/syncRaftCfg.h @@ -29,6 +29,8 @@ extern "C" { #define CONFIG_FILE_LEN 1024 +#define MAX_CONFIG_INDEX_COUNT 512 + typedef struct SRaftCfg { SSyncCfg cfg; TdFilePtr pFile; @@ -36,6 +38,10 @@ typedef struct SRaftCfg { int8_t isStandBy; int8_t snapshotEnable; SyncIndex lastConfigIndex; + + SyncIndex configIndexArr[MAX_CONFIG_INDEX_COUNT]; + int32_t configIndexCount; + } SRaftCfg; SRaftCfg *raftCfgOpen(const char *path); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 00786e5015..278288eb1e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -418,6 +418,29 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { return 0; } +int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return -1; + } + assert(rid == pSyncNode->rid); + + ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1); + SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0]; + + for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) { + if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex && + (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) { + lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i]; + } + } + sMeta->lastConfigIndex = lastIndex; + sTrace("sync get snapshot meta by index:%ld lastConfigIndex:%ld", snapshotIndex, sMeta->lastConfigIndex); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return 0; +} + const char* syncGetMyRoleStr(int64_t rid) { const char* s = syncUtilState2String(syncGetMyRole(rid)); return s; @@ -2195,8 +2218,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE char* newStr = syncCfg2Str(&newSyncCfg); syncUtilJson2Line(oldStr); syncUtilJson2Line(newStr); - snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, %s --> %s", oldSyncCfg.replicaNum, - newSyncCfg.replicaNum, oldStr, newStr); + snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum, + newSyncCfg.replicaNum, pEntry->index, oldStr, newStr); taosMemoryFree(oldStr); taosMemoryFree(newStr); @@ -2212,8 +2235,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE char* newStr = syncCfg2Str(&newSyncCfg); syncUtilJson2Line(oldStr); syncUtilJson2Line(newStr); - snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, %s --> %s", oldSyncCfg.replicaNum, - newSyncCfg.replicaNum, oldStr, newStr); + snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum, + newSyncCfg.replicaNum, pEntry->index, oldStr, newStr); taosMemoryFree(oldStr); taosMemoryFree(newStr); @@ -2295,6 +2318,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, } // restore finish + // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { if (ths->restoreFinish == false) { if (ths->pFsm->FpRestoreFinishCb != NULL) { diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 45e00aca2c..2d51f1f6f0 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -85,16 +85,11 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { } return pRoot; - /* - cJSON *pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SSyncCfg", pRoot); - return pJson; - */ } char *syncCfg2Str(SSyncCfg *pSyncCfg) { cJSON *pJson = syncCfg2Json(pSyncCfg); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -154,6 +149,16 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex); cJSON_AddStringToObject(pRoot, "lastConfigIndex", buf64); + cJSON_AddNumberToObject(pRoot, "configIndexCount", pRaftCfg->configIndexCount); + cJSON *pIndexArr = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "configIndexArr", pIndexArr); + for (int i = 0; i < pRaftCfg->configIndexCount; ++i) { + snprintf(buf64, sizeof(buf64), "%ld", (pRaftCfg->configIndexArr)[i]); + cJSON *pIndexObj = cJSON_CreateObject(); + cJSON_AddStringToObject(pIndexObj, "index", buf64); + cJSON_AddItemToArray(pIndexArr, pIndexObj); + } + cJSON *pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "RaftCfg", pRoot); return pJson; @@ -161,7 +166,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) { cJSON *pJson = raftCfg2Json(pRaftCfg); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -177,6 +182,9 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) { raftCfg.isStandBy = meta.isStandBy; raftCfg.snapshotEnable = meta.snapshotEnable; raftCfg.lastConfigIndex = meta.lastConfigIndex; + raftCfg.configIndexCount = 1; + memset(raftCfg.configIndexArr, 0, sizeof(raftCfg.configIndexArr)); + raftCfg.configIndexArr[0] = -1; char *s = raftCfg2Str(&raftCfg); char buf[CONFIG_FILE_LEN] = {0}; @@ -207,7 +215,24 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex"); pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex)); - cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); + cJSON *pJsonConfigIndexCount = cJSON_GetObjectItem(pJson, "configIndexCount"); + pRaftCfg->configIndexCount = cJSON_GetNumberValue(pJsonConfigIndexCount); + + cJSON *pIndexArr = cJSON_GetObjectItem(pJson, "configIndexArr"); + int arraySize = cJSON_GetArraySize(pIndexArr); + assert(arraySize == pRaftCfg->configIndexCount); + + memset(pRaftCfg->configIndexArr, 0, sizeof(pRaftCfg->configIndexArr)); + for (int i = 0; i < arraySize; ++i) { + cJSON *pIndexObj = cJSON_GetArrayItem(pIndexArr, i); + assert(pIndexObj != NULL); + + cJSON *pIndex = cJSON_GetObjectItem(pIndexObj, "index"); + assert(cJSON_IsString(pIndex)); + (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); + } + + cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); ASSERT(code == 0); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 7c6d97a96e..87dfef5fdd 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -410,20 +410,18 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { } int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { - /* - SSyncLogStoreData* pData = pLogStore->data; - SWal* pWal = pData->pWal; - // assert(walCommit(pWal, index) == 0); - int32_t code = walCommit(pWal, index); - if (code != 0) { - int32_t err = terrno; - const char* errStr = tstrerror(err); - int32_t linuxErr = errno; - const char* linuxErrMsg = strerror(errno); - sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, - linuxErrMsg); ASSERT(0); - } - */ + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + // assert(walCommit(pWal, index) == 0); + int32_t code = walCommit(pWal, index); + if (code != 0) { + int32_t err = terrno; + const char* errStr = tstrerror(err); + int32_t linuxErr = errno; + const char* linuxErrMsg = strerror(errno); + sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); + ASSERT(0); + } return 0; } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index ef2011cc26..4973d36295 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -421,7 +421,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -542,7 +542,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON *pTmp = pFromId; + cJSON * pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -566,7 +566,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -671,8 +671,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { char *newStr = syncCfg2Str(&newSyncCfg); syncUtilJson2Line(oldStr); syncUtilJson2Line(newStr); - snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, %s --> %s", oldSyncCfg.replicaNum, - newSyncCfg.replicaNum, oldStr, newStr); + snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, index:%ld, %s --> %s", + oldSyncCfg.replicaNum, newSyncCfg.replicaNum, pMsg->lastConfigIndex, oldStr, newStr); taosMemoryFree(oldStr); taosMemoryFree(newStr); diff --git a/source/libs/sync/test/syncRaftCfgTest.cpp b/source/libs/sync/test/syncRaftCfgTest.cpp index 8c6a704e2d..9460f0ad7a 100644 --- a/source/libs/sync/test/syncRaftCfgTest.cpp +++ b/source/libs/sync/test/syncRaftCfgTest.cpp @@ -27,6 +27,14 @@ SRaftCfg* createRaftCfg() { } pCfg->isStandBy = taosGetTimestampSec() % 100; + pCfg->configIndexCount = 5; + for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) { + (pCfg->configIndexArr)[i] = -1; + } + for (int i = 0; i < pCfg->configIndexCount; ++i) { + (pCfg->configIndexArr)[i] = i * 100; + } + return pCfg; } @@ -100,6 +108,15 @@ void test5() { pCfg->isStandBy += 2; pCfg->snapshotEnable += 3; pCfg->lastConfigIndex += 1000; + + pCfg->configIndexCount = 5; + for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) { + (pCfg->configIndexArr)[i] = -1; + } + for (int i = 0; i < pCfg->configIndexCount; ++i) { + (pCfg->configIndexArr)[i] = i * 100; + } + raftCfgPersist(pCfg); printf("%s update json file: %s myIndex->%d \n", (char*)__FUNCTION__, "./test3_raft_cfg.json", pCfg->cfg.myIndex); @@ -118,6 +135,6 @@ int main() { test3(); test4(); test5(); - + return 0; } From 28b32682246270336710717942cdda0f17d435b4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 17 Jun 2022 11:40:18 +0800 Subject: [PATCH 4/4] fix(sync) add trace log --- source/libs/sync/src/syncMain.c | 8 ++++---- source/libs/sync/src/syncSnapshot.c | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 343a0e5597..dbf7bc94e0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2197,8 +2197,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE char* newStr = syncCfg2Str(&newSyncCfg); syncUtilJson2Line(oldStr); syncUtilJson2Line(newStr); - snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, %s --> %s", oldSyncCfg.replicaNum, - newSyncCfg.replicaNum, oldStr, newStr); + snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum, + newSyncCfg.replicaNum, pEntry->index, oldStr, newStr); taosMemoryFree(oldStr); taosMemoryFree(newStr); @@ -2214,8 +2214,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE char* newStr = syncCfg2Str(&newSyncCfg); syncUtilJson2Line(oldStr); syncUtilJson2Line(newStr); - snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, %s --> %s", oldSyncCfg.replicaNum, - newSyncCfg.replicaNum, oldStr, newStr); + snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum, + newSyncCfg.replicaNum, pEntry->index, oldStr, newStr); taosMemoryFree(oldStr); taosMemoryFree(newStr); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index ef2011cc26..c8d2866738 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -671,8 +671,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { char *newStr = syncCfg2Str(&newSyncCfg); syncUtilJson2Line(oldStr); syncUtilJson2Line(newStr); - snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, %s --> %s", oldSyncCfg.replicaNum, - newSyncCfg.replicaNum, oldStr, newStr); + snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, index:%ld, %s --> %s", + oldSyncCfg.replicaNum, newSyncCfg.replicaNum, pMsg->lastConfigIndex, oldStr, newStr); taosMemoryFree(oldStr); taosMemoryFree(newStr);