Merge pull request #13924 from taosdata/fix/dnode
refactor: remove duplicate conninfo in SRpcMsg
This commit is contained in:
commit
21e9ef3264
|
@ -192,6 +192,7 @@ bool syncEnvIsStart();
|
|||
const char* syncStr(ESyncState state);
|
||||
bool syncIsRestoreFinish(int64_t rid);
|
||||
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
|
||||
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
|
||||
|
||||
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
|
||||
|
||||
|
|
|
@ -46,7 +46,6 @@ typedef struct SRpcHandleInfo {
|
|||
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
|
||||
int32_t persistHandle; // persist handle or not
|
||||
|
||||
SRpcConnInfo connInfo;
|
||||
// app info
|
||||
void *ahandle; // app handle set by client
|
||||
void *wrapper; // wrapper handle
|
||||
|
@ -55,6 +54,9 @@ typedef struct SRpcHandleInfo {
|
|||
// resp info
|
||||
void * rsp;
|
||||
int32_t rspLen;
|
||||
|
||||
// conn info
|
||||
SRpcConnInfo conn;
|
||||
} SRpcHandleInfo;
|
||||
|
||||
typedef struct SRpcMsg {
|
||||
|
@ -63,7 +65,6 @@ typedef struct SRpcMsg {
|
|||
int32_t contLen;
|
||||
int32_t code;
|
||||
SRpcHandleInfo info;
|
||||
SRpcConnInfo conn;
|
||||
} SRpcMsg;
|
||||
|
||||
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);
|
||||
|
|
|
@ -21,21 +21,6 @@ static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet);
|
|||
static void dmSendRsp(SRpcMsg *pMsg);
|
||||
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
|
||||
static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) {
|
||||
SRpcConnInfo *pConnInfo = &(pRpc->info.connInfo);
|
||||
// if (IsReq(pRpc)) {
|
||||
// terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||
// dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->info.ahandle, pRpc->info.handle);
|
||||
// return -1;
|
||||
//}
|
||||
|
||||
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
||||
memcpy(pMsg->conn.user, pConnInfo->user, TSDB_USER_LEN);
|
||||
pMsg->conn.clientIp = pConnInfo->clientIp;
|
||||
pMsg->conn.clientPort = pConnInfo->clientPort;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
|
||||
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
|
||||
if (msgFp == NULL) {
|
||||
|
@ -116,14 +101,10 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
}
|
||||
|
||||
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
|
||||
if (pMsg == NULL) {
|
||||
goto _OVER;
|
||||
}
|
||||
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
|
||||
if (pMsg == NULL) goto _OVER;
|
||||
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
||||
|
||||
if (dmBuildNodeMsg(pMsg, pRpc) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
dTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle);
|
||||
|
||||
if (InParentProc(pWrapper)) {
|
||||
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);
|
||||
|
|
|
@ -56,7 +56,7 @@ static int32_t mndProcessAuthReq(SRpcMsg *pReq) {
|
|||
memcpy(authRsp.user, authReq.user, TSDB_USER_LEN);
|
||||
|
||||
int32_t code = mndRetriveAuth(pReq->info.node, &authRsp);
|
||||
mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->conn.user, authRsp.spi, authRsp.encrypt,
|
||||
mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->info.conn.user, authRsp.spi, authRsp.encrypt,
|
||||
authRsp.user);
|
||||
|
||||
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authRsp);
|
||||
|
|
|
@ -292,7 +292,7 @@ static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_BNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_BNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -394,7 +394,7 @@ static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_BNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_BNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -521,12 +521,12 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pUser == NULL) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_DB, NULL) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DB, NULL) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -700,7 +700,7 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_ALTER_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_ALTER_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -980,7 +980,7 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_DROP_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1127,7 +1127,7 @@ static int32_t mndProcessUseDbReq(SRpcMsg *pReq) {
|
|||
|
||||
mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr());
|
||||
} else {
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_USE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_USE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1252,7 +1252,7 @@ static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_COMPACT_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -522,7 +522,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_DNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -623,7 +623,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_MNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -318,7 +318,7 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_FUNC) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_FUNC) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -365,7 +365,7 @@ static int32_t mndProcessDropFuncReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_FUNC) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_FUNC) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -414,7 +414,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_MNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -621,7 +621,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_MNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -133,7 +133,7 @@ static void mndFreeConn(SConnObj *pConn) {
|
|||
taosWLockLatch(&pConn->queryLock);
|
||||
taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
|
||||
taosWUnLockLatch(&pConn->queryLock);
|
||||
|
||||
|
||||
mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
|
||||
}
|
||||
|
||||
|
@ -194,15 +194,15 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
|||
goto CONN_OVER;
|
||||
}
|
||||
|
||||
taosIp2String(pReq->conn.clientIp, ip);
|
||||
taosIp2String(pReq->info.conn.clientIp, ip);
|
||||
|
||||
pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pUser == NULL) {
|
||||
mError("user:%s, failed to login while acquire user since %s", pReq->conn.user, terrstr());
|
||||
mError("user:%s, failed to login while acquire user since %s", pReq->info.conn.user, terrstr());
|
||||
goto CONN_OVER;
|
||||
}
|
||||
if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
|
||||
mError("user:%s, failed to auth while acquire user, input:%s", pReq->conn.user, connReq.passwd);
|
||||
mError("user:%s, failed to auth while acquire user, input:%s", pReq->info.conn.user, connReq.passwd);
|
||||
code = TSDB_CODE_RPC_AUTH_FAILURE;
|
||||
goto CONN_OVER;
|
||||
}
|
||||
|
@ -213,15 +213,16 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
|||
pDb = mndAcquireDb(pMnode, db);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_INVALID_DB;
|
||||
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->conn.user, ip, connReq.db, terrstr());
|
||||
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
|
||||
terrstr());
|
||||
goto CONN_OVER;
|
||||
}
|
||||
}
|
||||
|
||||
pConn = mndCreateConn(pMnode, pReq->conn.user, connReq.connType, pReq->conn.clientIp, pReq->conn.clientPort,
|
||||
connReq.pid, connReq.app, connReq.startTime);
|
||||
pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
|
||||
pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
|
||||
if (pConn == NULL) {
|
||||
mError("user:%s, failed to login from %s while create connection since %s", pReq->conn.user, ip, terrstr());
|
||||
mError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr());
|
||||
goto CONN_OVER;
|
||||
}
|
||||
|
||||
|
@ -246,7 +247,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
|||
pReq->info.rspLen = contLen;
|
||||
pReq->info.rsp = pRsp;
|
||||
|
||||
mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->conn.user, ip, pConn->port, pConn->id, connReq.app);
|
||||
mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app);
|
||||
|
||||
code = 0;
|
||||
|
||||
|
@ -348,7 +349,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
|||
if (pHbReq->query) {
|
||||
SQueryHbReqBasic *pBasic = pHbReq->query;
|
||||
|
||||
SRpcConnInfo connInfo = pMsg->conn;
|
||||
SRpcConnInfo connInfo = pMsg->info.conn;
|
||||
|
||||
SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
|
||||
if (pConn == NULL) {
|
||||
|
@ -361,7 +362,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
|||
mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
|
||||
if (rspBasic == NULL) {
|
||||
mndReleaseConn(pMnode, pConn);
|
||||
|
@ -386,7 +387,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
|||
mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
|
||||
|
||||
mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1);
|
||||
|
||||
|
||||
mndReleaseConn(pMnode, pConn);
|
||||
|
||||
hbRsp.query = rspBasic;
|
||||
|
@ -500,7 +501,7 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
|
|||
SMnode *pMnode = pReq->info.node;
|
||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||
|
||||
SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pUser == NULL) return 0;
|
||||
if (!pUser->superUser) {
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
|
@ -523,7 +524,7 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
|
|||
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
|
||||
return -1;
|
||||
} else {
|
||||
mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->conn.user);
|
||||
mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->info.conn.user);
|
||||
pConn->killId = killReq.queryId;
|
||||
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
|
||||
return 0;
|
||||
|
@ -534,7 +535,7 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
|
|||
SMnode *pMnode = pReq->info.node;
|
||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||
|
||||
SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pUser == NULL) return 0;
|
||||
if (!pUser->superUser) {
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
|
@ -555,7 +556,7 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
|
|||
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
|
||||
return -1;
|
||||
} else {
|
||||
mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->conn.user);
|
||||
mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->info.conn.user);
|
||||
pConn->killed = 1;
|
||||
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -563,12 +564,12 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
int32_t cols = 0;
|
||||
SConnObj *pConn = NULL;
|
||||
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
int32_t cols = 0;
|
||||
SConnObj *pConn = NULL;
|
||||
|
||||
if (pShow->pIter == NULL) {
|
||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
|
||||
|
@ -619,12 +620,12 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
}
|
||||
|
||||
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
int32_t cols = 0;
|
||||
SConnObj *pConn = NULL;
|
||||
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
int32_t cols = 0;
|
||||
SConnObj *pConn = NULL;
|
||||
|
||||
if (pShow->pIter == NULL) {
|
||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
|
||||
|
@ -645,7 +646,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
|
|||
|
||||
int32_t numOfQueries = taosArrayGetSize(pConn->pQueries);
|
||||
for (int32_t i = 0; i < numOfQueries; ++i) {
|
||||
SQueryDesc* pQuery = taosArrayGet(pConn->pQueries, i);
|
||||
SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i);
|
||||
cols = 0;
|
||||
|
||||
char queryId[26 + VARSTR_HEADER_SIZE] = {0};
|
||||
|
@ -691,14 +692,14 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false);
|
||||
|
||||
char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
int32_t strSize = sizeof(subStatus);
|
||||
int32_t offset = VARSTR_HEADER_SIZE;
|
||||
for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) {
|
||||
if (i) {
|
||||
offset += snprintf(subStatus + offset, strSize - offset - 1, ",");
|
||||
}
|
||||
SQuerySubDesc* pDesc = taosArrayGet(pQuery->subDesc, i);
|
||||
SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i);
|
||||
offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
|
||||
}
|
||||
varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
|
||||
|
@ -712,7 +713,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
|
|||
|
||||
numOfRows++;
|
||||
}
|
||||
|
||||
|
||||
taosRUnLockLatch(&pConn->queryLock);
|
||||
}
|
||||
|
||||
|
|
|
@ -294,7 +294,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_QNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_QNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -396,7 +396,7 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_QNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_QNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -229,7 +229,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
|
|||
|
||||
mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
|
||||
|
||||
// if (mndCheckShowAuth(pMnode, pReq->conn.user, pShow->type) != 0) return -1;
|
||||
// if (mndCheckShowAuth(pMnode, pReq->info.conn.user, pShow->type) != 0) return -1;
|
||||
|
||||
int32_t numOfCols = pShow->pMeta->numOfColumns;
|
||||
SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
|
|
|
@ -710,7 +710,7 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -971,7 +971,7 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -300,7 +300,7 @@ static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_SNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_SNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -404,7 +404,7 @@ static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_SNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_SNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -805,7 +805,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1454,7 +1454,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1584,7 +1584,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -545,7 +545,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) {
|
||||
if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
|
||||
mError("trans:%d, failed to create stb for stream since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
|
@ -602,7 +602,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
#endif
|
||||
|
@ -627,7 +627,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// create stb for stream
|
||||
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) {
|
||||
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
|
||||
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
goto _OVER;
|
||||
|
@ -696,7 +696,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
#if 0
|
||||
// todo check auth
|
||||
pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pUser == NULL) {
|
||||
goto DROP_STREAM_OVER;
|
||||
}
|
||||
|
|
|
@ -70,7 +70,8 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
|
|||
|
||||
if (cbMeta.index - sdbGetApplyIndex(pMnode->pSdb) > 100) {
|
||||
SSnapshotMeta sMeta = {0};
|
||||
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
|
||||
// if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
|
||||
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, cbMeta.index, &sMeta) == 0) {
|
||||
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
|
||||
}
|
||||
sdbWriteFile(pMnode->pSdb);
|
||||
|
@ -90,7 +91,10 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
|
|||
SMnode *pMnode = pFsm->data;
|
||||
|
||||
SSnapshotMeta sMeta = {0};
|
||||
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
|
||||
// if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
|
||||
|
||||
SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb);
|
||||
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) {
|
||||
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
|
||||
}
|
||||
|
||||
|
|
|
@ -474,7 +474,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,8 +22,8 @@
|
|||
#include "mndSync.h"
|
||||
#include "mndUser.h"
|
||||
|
||||
#define TRANS_VER_NUMBER 1
|
||||
#define TRANS_ARRAY_SIZE 8
|
||||
#define TRANS_VER_NUMBER 1
|
||||
#define TRANS_ARRAY_SIZE 8
|
||||
#define TRANS_RESERVE_SIZE 64
|
||||
|
||||
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
|
||||
|
@ -1369,7 +1369,7 @@ static int32_t mndProcessKillTransReq(SRpcMsg *pReq) {
|
|||
|
||||
mInfo("trans:%d, start to kill", killReq.transId);
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_KILL_TRANS) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_KILL_TRANS) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1417,7 +1417,9 @@ void mndTransPullup(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
SSnapshotMeta sMeta = {0};
|
||||
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
|
||||
// if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
|
||||
SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb);
|
||||
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) {
|
||||
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
|
||||
}
|
||||
sdbWriteFile(pMnode->pSdb);
|
||||
|
|
|
@ -354,13 +354,13 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
pOperUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
pOperUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pOperUser == NULL) {
|
||||
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_USER) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_USER) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -460,7 +460,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
pOperUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
pOperUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pOperUser == NULL) {
|
||||
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||
goto _OVER;
|
||||
|
@ -643,7 +643,7 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_USER) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_USER) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -1200,7 +1200,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
|
|||
|
||||
mInfo("vgId:%d, start to redistribute to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER;
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER;
|
||||
|
||||
pVgroup = mndAcquireVgroup(pMnode, req.vgId);
|
||||
if (pVgroup == NULL) goto _OVER;
|
||||
|
@ -1500,7 +1500,7 @@ static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) {
|
|||
pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
||||
if (pDb == NULL) goto _OVER;
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_SPLIT_VGROUP) != 0) goto _OVER;
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_SPLIT_VGROUP) != 0) goto _OVER;
|
||||
|
||||
code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
@ -1624,7 +1624,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
|
|||
|
||||
mDebug("start to balance vgroup");
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_BALANCE_VGROUP) != 0) goto _OVER;
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_BALANCE_VGROUP) != 0) goto _OVER;
|
||||
|
||||
while (1) {
|
||||
SDnodeObj *pDnode = NULL;
|
||||
|
|
|
@ -184,7 +184,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
|
||||
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
||||
if (rsp.code == 0) {
|
||||
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->conn.applyIndex, &rsp) < 0) {
|
||||
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
|
||||
rsp.code = terrno;
|
||||
vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
|
||||
}
|
||||
|
@ -329,8 +329,9 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
|||
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) {
|
||||
SVnode *pVnode = pFsm->data;
|
||||
|
||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
|
||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
||||
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
|
||||
rpcMsg.info.conn.applyIndex = cbMeta.index;
|
||||
|
||||
vInfo("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
|
||||
TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle);
|
||||
|
@ -359,10 +360,11 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
|
|||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
|
||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
|
||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
||||
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
|
||||
rpcMsg.info.conn.applyIndex = cbMeta.index;
|
||||
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
||||
|
||||
} else {
|
||||
|
|
|
@ -29,6 +29,8 @@ extern "C" {
|
|||
|
||||
#define CONFIG_FILE_LEN 1024
|
||||
|
||||
#define MAX_CONFIG_INDEX_COUNT 512
|
||||
|
||||
typedef struct SRaftCfg {
|
||||
SSyncCfg cfg;
|
||||
TdFilePtr pFile;
|
||||
|
@ -36,6 +38,10 @@ typedef struct SRaftCfg {
|
|||
int8_t isStandBy;
|
||||
int8_t snapshotEnable;
|
||||
SyncIndex lastConfigIndex;
|
||||
|
||||
SyncIndex configIndexArr[MAX_CONFIG_INDEX_COUNT];
|
||||
int32_t configIndexCount;
|
||||
|
||||
} SRaftCfg;
|
||||
|
||||
SRaftCfg *raftCfgOpen(const char *path);
|
||||
|
|
|
@ -420,6 +420,29 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return -1;
|
||||
}
|
||||
assert(rid == pSyncNode->rid);
|
||||
|
||||
ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
|
||||
SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];
|
||||
|
||||
for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
|
||||
if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
|
||||
(pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
|
||||
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
|
||||
}
|
||||
}
|
||||
sMeta->lastConfigIndex = lastIndex;
|
||||
sTrace("sync get snapshot meta by index:%ld lastConfigIndex:%ld", snapshotIndex, sMeta->lastConfigIndex);
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
return 0;
|
||||
}
|
||||
|
||||
const char* syncGetMyRoleStr(int64_t rid) {
|
||||
const char* s = syncUtilState2String(syncGetMyRole(rid));
|
||||
return s;
|
||||
|
@ -2197,8 +2220,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
|
|||
char* newStr = syncCfg2Str(&newSyncCfg);
|
||||
syncUtilJson2Line(oldStr);
|
||||
syncUtilJson2Line(newStr);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, %s --> %s", oldSyncCfg.replicaNum,
|
||||
newSyncCfg.replicaNum, oldStr, newStr);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
|
||||
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
|
||||
taosMemoryFree(oldStr);
|
||||
taosMemoryFree(newStr);
|
||||
|
||||
|
@ -2214,8 +2237,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
|
|||
char* newStr = syncCfg2Str(&newSyncCfg);
|
||||
syncUtilJson2Line(oldStr);
|
||||
syncUtilJson2Line(newStr);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, %s --> %s", oldSyncCfg.replicaNum,
|
||||
newSyncCfg.replicaNum, oldStr, newStr);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
|
||||
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
|
||||
taosMemoryFree(oldStr);
|
||||
taosMemoryFree(newStr);
|
||||
|
||||
|
@ -2297,6 +2320,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
|||
}
|
||||
|
||||
// restore finish
|
||||
// if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
|
||||
if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
|
||||
if (ths->restoreFinish == false) {
|
||||
if (ths->pFsm->FpRestoreFinishCb != NULL) {
|
||||
|
|
|
@ -85,16 +85,11 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
|
|||
}
|
||||
|
||||
return pRoot;
|
||||
/*
|
||||
cJSON *pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "SSyncCfg", pRoot);
|
||||
return pJson;
|
||||
*/
|
||||
}
|
||||
|
||||
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
|
||||
cJSON *pJson = syncCfg2Json(pSyncCfg);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
@ -154,6 +149,16 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
|
|||
snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex);
|
||||
cJSON_AddStringToObject(pRoot, "lastConfigIndex", buf64);
|
||||
|
||||
cJSON_AddNumberToObject(pRoot, "configIndexCount", pRaftCfg->configIndexCount);
|
||||
cJSON *pIndexArr = cJSON_CreateArray();
|
||||
cJSON_AddItemToObject(pRoot, "configIndexArr", pIndexArr);
|
||||
for (int i = 0; i < pRaftCfg->configIndexCount; ++i) {
|
||||
snprintf(buf64, sizeof(buf64), "%ld", (pRaftCfg->configIndexArr)[i]);
|
||||
cJSON *pIndexObj = cJSON_CreateObject();
|
||||
cJSON_AddStringToObject(pIndexObj, "index", buf64);
|
||||
cJSON_AddItemToArray(pIndexArr, pIndexObj);
|
||||
}
|
||||
|
||||
cJSON *pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "RaftCfg", pRoot);
|
||||
return pJson;
|
||||
|
@ -161,7 +166,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
|
|||
|
||||
char *raftCfg2Str(SRaftCfg *pRaftCfg) {
|
||||
cJSON *pJson = raftCfg2Json(pRaftCfg);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
@ -177,6 +182,9 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
|
|||
raftCfg.isStandBy = meta.isStandBy;
|
||||
raftCfg.snapshotEnable = meta.snapshotEnable;
|
||||
raftCfg.lastConfigIndex = meta.lastConfigIndex;
|
||||
raftCfg.configIndexCount = 1;
|
||||
memset(raftCfg.configIndexArr, 0, sizeof(raftCfg.configIndexArr));
|
||||
raftCfg.configIndexArr[0] = -1;
|
||||
char *s = raftCfg2Str(&raftCfg);
|
||||
|
||||
char buf[CONFIG_FILE_LEN] = {0};
|
||||
|
@ -207,7 +215,24 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
|
|||
cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex");
|
||||
pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex));
|
||||
|
||||
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
|
||||
cJSON *pJsonConfigIndexCount = cJSON_GetObjectItem(pJson, "configIndexCount");
|
||||
pRaftCfg->configIndexCount = cJSON_GetNumberValue(pJsonConfigIndexCount);
|
||||
|
||||
cJSON *pIndexArr = cJSON_GetObjectItem(pJson, "configIndexArr");
|
||||
int arraySize = cJSON_GetArraySize(pIndexArr);
|
||||
assert(arraySize == pRaftCfg->configIndexCount);
|
||||
|
||||
memset(pRaftCfg->configIndexArr, 0, sizeof(pRaftCfg->configIndexArr));
|
||||
for (int i = 0; i < arraySize; ++i) {
|
||||
cJSON *pIndexObj = cJSON_GetArrayItem(pIndexArr, i);
|
||||
assert(pIndexObj != NULL);
|
||||
|
||||
cJSON *pIndex = cJSON_GetObjectItem(pIndexObj, "index");
|
||||
assert(cJSON_IsString(pIndex));
|
||||
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
|
||||
}
|
||||
|
||||
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
|
||||
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
|
||||
ASSERT(code == 0);
|
||||
|
||||
|
|
|
@ -410,20 +410,18 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
|
|||
}
|
||||
|
||||
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||
|
||||
SSyncLogStoreData* pData = pLogStore->data;
|
||||
SWal* pWal = pData->pWal;
|
||||
// assert(walCommit(pWal, index) == 0);
|
||||
int32_t code = walCommit(pWal, index);
|
||||
if (code != 0) {
|
||||
int32_t err = terrno;
|
||||
const char* errStr = tstrerror(err);
|
||||
int32_t linuxErr = errno;
|
||||
const char* linuxErrMsg = strerror(errno);
|
||||
sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
|
||||
linuxErrMsg); ASSERT(0);
|
||||
}
|
||||
|
||||
SSyncLogStoreData* pData = pLogStore->data;
|
||||
SWal* pWal = pData->pWal;
|
||||
// assert(walCommit(pWal, index) == 0);
|
||||
int32_t code = walCommit(pWal, index);
|
||||
if (code != 0) {
|
||||
int32_t err = terrno;
|
||||
const char* errStr = tstrerror(err);
|
||||
int32_t linuxErr = errno;
|
||||
const char* linuxErrMsg = strerror(errno);
|
||||
sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg);
|
||||
ASSERT(0);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -421,7 +421,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
|
|||
|
||||
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
|
||||
cJSON *pJson = snapshotSender2Json(pSender);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
@ -542,7 +542,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
|
|||
cJSON_AddStringToObject(pFromId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pReceiver->fromId.addr;
|
||||
cJSON *pTmp = pFromId;
|
||||
cJSON * pTmp = pFromId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
|
@ -566,7 +566,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
|
|||
|
||||
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
|
||||
cJSON *pJson = snapshotReceiver2Json(pReceiver);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
@ -671,8 +671,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
char *newStr = syncCfg2Str(&newSyncCfg);
|
||||
syncUtilJson2Line(oldStr);
|
||||
syncUtilJson2Line(newStr);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, %s --> %s", oldSyncCfg.replicaNum,
|
||||
newSyncCfg.replicaNum, oldStr, newStr);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, index:%ld, %s --> %s",
|
||||
oldSyncCfg.replicaNum, newSyncCfg.replicaNum, pMsg->lastConfigIndex, oldStr, newStr);
|
||||
taosMemoryFree(oldStr);
|
||||
taosMemoryFree(newStr);
|
||||
|
||||
|
|
|
@ -27,6 +27,14 @@ SRaftCfg* createRaftCfg() {
|
|||
}
|
||||
pCfg->isStandBy = taosGetTimestampSec() % 100;
|
||||
|
||||
pCfg->configIndexCount = 5;
|
||||
for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) {
|
||||
(pCfg->configIndexArr)[i] = -1;
|
||||
}
|
||||
for (int i = 0; i < pCfg->configIndexCount; ++i) {
|
||||
(pCfg->configIndexArr)[i] = i * 100;
|
||||
}
|
||||
|
||||
return pCfg;
|
||||
}
|
||||
|
||||
|
@ -100,6 +108,15 @@ void test5() {
|
|||
pCfg->isStandBy += 2;
|
||||
pCfg->snapshotEnable += 3;
|
||||
pCfg->lastConfigIndex += 1000;
|
||||
|
||||
pCfg->configIndexCount = 5;
|
||||
for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) {
|
||||
(pCfg->configIndexArr)[i] = -1;
|
||||
}
|
||||
for (int i = 0; i < pCfg->configIndexCount; ++i) {
|
||||
(pCfg->configIndexArr)[i] = i * 100;
|
||||
}
|
||||
|
||||
raftCfgPersist(pCfg);
|
||||
|
||||
printf("%s update json file: %s myIndex->%d \n", (char*)__FUNCTION__, "./test3_raft_cfg.json", pCfg->cfg.myIndex);
|
||||
|
@ -118,6 +135,6 @@ int main() {
|
|||
test3();
|
||||
test4();
|
||||
test5();
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -310,7 +310,7 @@ static void uvHandleReq(SSvrConn* pConn) {
|
|||
}
|
||||
|
||||
// set up conn info
|
||||
SRpcConnInfo* pConnInfo = &(transMsg.info.connInfo);
|
||||
SRpcConnInfo* pConnInfo = &(transMsg.info.conn);
|
||||
pConnInfo->clientIp = (uint32_t)(pConn->addr.sin_addr.s_addr);
|
||||
pConnInfo->clientPort = ntohs(pConn->addr.sin_port);
|
||||
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
|
||||
|
|
Loading…
Reference in New Issue