refactor: remove duplicate conninfo in SRpcMsg
This commit is contained in:
parent
793e6fa7d9
commit
80e0b830fe
|
@ -46,7 +46,6 @@ typedef struct SRpcHandleInfo {
|
|||
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
|
||||
int32_t persistHandle; // persist handle or not
|
||||
|
||||
SRpcConnInfo connInfo;
|
||||
// app info
|
||||
void *ahandle; // app handle set by client
|
||||
void *wrapper; // wrapper handle
|
||||
|
@ -55,6 +54,9 @@ typedef struct SRpcHandleInfo {
|
|||
// resp info
|
||||
void * rsp;
|
||||
int32_t rspLen;
|
||||
|
||||
// conn info
|
||||
SRpcConnInfo conn;
|
||||
} SRpcHandleInfo;
|
||||
|
||||
typedef struct SRpcMsg {
|
||||
|
@ -63,7 +65,6 @@ typedef struct SRpcMsg {
|
|||
int32_t contLen;
|
||||
int32_t code;
|
||||
SRpcHandleInfo info;
|
||||
SRpcConnInfo conn;
|
||||
} SRpcMsg;
|
||||
|
||||
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);
|
||||
|
|
|
@ -21,21 +21,6 @@ static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet);
|
|||
static void dmSendRsp(SRpcMsg *pMsg);
|
||||
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
|
||||
static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) {
|
||||
SRpcConnInfo *pConnInfo = &(pRpc->info.connInfo);
|
||||
// if (IsReq(pRpc)) {
|
||||
// terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||
// dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->info.ahandle, pRpc->info.handle);
|
||||
// return -1;
|
||||
//}
|
||||
|
||||
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
||||
memcpy(pMsg->conn.user, pConnInfo->user, TSDB_USER_LEN);
|
||||
pMsg->conn.clientIp = pConnInfo->clientIp;
|
||||
pMsg->conn.clientPort = pConnInfo->clientPort;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
|
||||
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
|
||||
if (msgFp == NULL) {
|
||||
|
@ -116,14 +101,10 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
}
|
||||
|
||||
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
|
||||
if (pMsg == NULL) {
|
||||
goto _OVER;
|
||||
}
|
||||
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
|
||||
if (pMsg == NULL) goto _OVER;
|
||||
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
||||
|
||||
if (dmBuildNodeMsg(pMsg, pRpc) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
dTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle);
|
||||
|
||||
if (InParentProc(pWrapper)) {
|
||||
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);
|
||||
|
|
|
@ -56,7 +56,7 @@ static int32_t mndProcessAuthReq(SRpcMsg *pReq) {
|
|||
memcpy(authRsp.user, authReq.user, TSDB_USER_LEN);
|
||||
|
||||
int32_t code = mndRetriveAuth(pReq->info.node, &authRsp);
|
||||
mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->conn.user, authRsp.spi, authRsp.encrypt,
|
||||
mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->info.conn.user, authRsp.spi, authRsp.encrypt,
|
||||
authRsp.user);
|
||||
|
||||
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authRsp);
|
||||
|
|
|
@ -292,7 +292,7 @@ static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_BNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_BNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -394,7 +394,7 @@ static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_BNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_BNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -521,12 +521,12 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pUser == NULL) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_DB, NULL) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DB, NULL) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -700,7 +700,7 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_ALTER_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_ALTER_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -980,7 +980,7 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_DROP_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1127,7 +1127,7 @@ static int32_t mndProcessUseDbReq(SRpcMsg *pReq) {
|
|||
|
||||
mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr());
|
||||
} else {
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_USE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_USE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1252,7 +1252,7 @@ static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_COMPACT_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -522,7 +522,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_DNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -623,7 +623,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_MNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -318,7 +318,7 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_FUNC) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_FUNC) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -365,7 +365,7 @@ static int32_t mndProcessDropFuncReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_FUNC) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_FUNC) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -414,7 +414,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_MNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -621,7 +621,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_MNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -133,7 +133,7 @@ static void mndFreeConn(SConnObj *pConn) {
|
|||
taosWLockLatch(&pConn->queryLock);
|
||||
taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
|
||||
taosWUnLockLatch(&pConn->queryLock);
|
||||
|
||||
|
||||
mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
|
||||
}
|
||||
|
||||
|
@ -194,15 +194,15 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
|||
goto CONN_OVER;
|
||||
}
|
||||
|
||||
taosIp2String(pReq->conn.clientIp, ip);
|
||||
taosIp2String(pReq->info.conn.clientIp, ip);
|
||||
|
||||
pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pUser == NULL) {
|
||||
mError("user:%s, failed to login while acquire user since %s", pReq->conn.user, terrstr());
|
||||
mError("user:%s, failed to login while acquire user since %s", pReq->info.conn.user, terrstr());
|
||||
goto CONN_OVER;
|
||||
}
|
||||
if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
|
||||
mError("user:%s, failed to auth while acquire user, input:%s", pReq->conn.user, connReq.passwd);
|
||||
mError("user:%s, failed to auth while acquire user, input:%s", pReq->info.conn.user, connReq.passwd);
|
||||
code = TSDB_CODE_RPC_AUTH_FAILURE;
|
||||
goto CONN_OVER;
|
||||
}
|
||||
|
@ -213,15 +213,16 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
|||
pDb = mndAcquireDb(pMnode, db);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_INVALID_DB;
|
||||
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->conn.user, ip, connReq.db, terrstr());
|
||||
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
|
||||
terrstr());
|
||||
goto CONN_OVER;
|
||||
}
|
||||
}
|
||||
|
||||
pConn = mndCreateConn(pMnode, pReq->conn.user, connReq.connType, pReq->conn.clientIp, pReq->conn.clientPort,
|
||||
connReq.pid, connReq.app, connReq.startTime);
|
||||
pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
|
||||
pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
|
||||
if (pConn == NULL) {
|
||||
mError("user:%s, failed to login from %s while create connection since %s", pReq->conn.user, ip, terrstr());
|
||||
mError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr());
|
||||
goto CONN_OVER;
|
||||
}
|
||||
|
||||
|
@ -246,7 +247,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
|||
pReq->info.rspLen = contLen;
|
||||
pReq->info.rsp = pRsp;
|
||||
|
||||
mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->conn.user, ip, pConn->port, pConn->id, connReq.app);
|
||||
mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app);
|
||||
|
||||
code = 0;
|
||||
|
||||
|
@ -348,7 +349,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
|||
if (pHbReq->query) {
|
||||
SQueryHbReqBasic *pBasic = pHbReq->query;
|
||||
|
||||
SRpcConnInfo connInfo = pMsg->conn;
|
||||
SRpcConnInfo connInfo = pMsg->info.conn;
|
||||
|
||||
SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
|
||||
if (pConn == NULL) {
|
||||
|
@ -361,7 +362,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
|||
mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
|
||||
if (rspBasic == NULL) {
|
||||
mndReleaseConn(pMnode, pConn);
|
||||
|
@ -386,7 +387,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
|||
mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
|
||||
|
||||
mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1);
|
||||
|
||||
|
||||
mndReleaseConn(pMnode, pConn);
|
||||
|
||||
hbRsp.query = rspBasic;
|
||||
|
@ -500,7 +501,7 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
|
|||
SMnode *pMnode = pReq->info.node;
|
||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||
|
||||
SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pUser == NULL) return 0;
|
||||
if (!pUser->superUser) {
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
|
@ -523,7 +524,7 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
|
|||
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
|
||||
return -1;
|
||||
} else {
|
||||
mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->conn.user);
|
||||
mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->info.conn.user);
|
||||
pConn->killId = killReq.queryId;
|
||||
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
|
||||
return 0;
|
||||
|
@ -534,7 +535,7 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
|
|||
SMnode *pMnode = pReq->info.node;
|
||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||
|
||||
SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pUser == NULL) return 0;
|
||||
if (!pUser->superUser) {
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
|
@ -555,7 +556,7 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
|
|||
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
|
||||
return -1;
|
||||
} else {
|
||||
mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->conn.user);
|
||||
mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->info.conn.user);
|
||||
pConn->killed = 1;
|
||||
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -563,12 +564,12 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
int32_t cols = 0;
|
||||
SConnObj *pConn = NULL;
|
||||
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
int32_t cols = 0;
|
||||
SConnObj *pConn = NULL;
|
||||
|
||||
if (pShow->pIter == NULL) {
|
||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
|
||||
|
@ -619,12 +620,12 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
}
|
||||
|
||||
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
int32_t cols = 0;
|
||||
SConnObj *pConn = NULL;
|
||||
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
int32_t cols = 0;
|
||||
SConnObj *pConn = NULL;
|
||||
|
||||
if (pShow->pIter == NULL) {
|
||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
|
||||
|
@ -645,7 +646,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
|
|||
|
||||
int32_t numOfQueries = taosArrayGetSize(pConn->pQueries);
|
||||
for (int32_t i = 0; i < numOfQueries; ++i) {
|
||||
SQueryDesc* pQuery = taosArrayGet(pConn->pQueries, i);
|
||||
SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i);
|
||||
cols = 0;
|
||||
|
||||
char queryId[26 + VARSTR_HEADER_SIZE] = {0};
|
||||
|
@ -691,14 +692,14 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false);
|
||||
|
||||
char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
int32_t strSize = sizeof(subStatus);
|
||||
int32_t offset = VARSTR_HEADER_SIZE;
|
||||
for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) {
|
||||
if (i) {
|
||||
offset += snprintf(subStatus + offset, strSize - offset - 1, ",");
|
||||
}
|
||||
SQuerySubDesc* pDesc = taosArrayGet(pQuery->subDesc, i);
|
||||
SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i);
|
||||
offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
|
||||
}
|
||||
varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
|
||||
|
@ -712,7 +713,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
|
|||
|
||||
numOfRows++;
|
||||
}
|
||||
|
||||
|
||||
taosRUnLockLatch(&pConn->queryLock);
|
||||
}
|
||||
|
||||
|
|
|
@ -294,7 +294,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_QNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_QNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -396,7 +396,7 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_QNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_QNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -229,7 +229,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
|
|||
|
||||
mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
|
||||
|
||||
// if (mndCheckShowAuth(pMnode, pReq->conn.user, pShow->type) != 0) return -1;
|
||||
// if (mndCheckShowAuth(pMnode, pReq->info.conn.user, pShow->type) != 0) return -1;
|
||||
|
||||
int32_t numOfCols = pShow->pMeta->numOfColumns;
|
||||
SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
|
|
|
@ -710,7 +710,7 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -971,7 +971,7 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -300,7 +300,7 @@ static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_SNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_SNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -404,7 +404,7 @@ static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_SNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_SNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -805,7 +805,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1454,7 +1454,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1584,7 +1584,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -545,7 +545,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) {
|
||||
if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
|
||||
mError("trans:%d, failed to create stb for stream since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
|
@ -602,7 +602,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
#endif
|
||||
|
@ -627,7 +627,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// create stb for stream
|
||||
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) {
|
||||
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
|
||||
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
goto _OVER;
|
||||
|
@ -696,7 +696,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
#if 0
|
||||
// todo check auth
|
||||
pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pUser == NULL) {
|
||||
goto DROP_STREAM_OVER;
|
||||
}
|
||||
|
|
|
@ -474,7 +474,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -1369,7 +1369,7 @@ static int32_t mndProcessKillTransReq(SRpcMsg *pReq) {
|
|||
|
||||
mInfo("trans:%d, start to kill", killReq.transId);
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_KILL_TRANS) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_KILL_TRANS) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -354,13 +354,13 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
pOperUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
pOperUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pOperUser == NULL) {
|
||||
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_USER) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_USER) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -460,7 +460,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
pOperUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
pOperUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pOperUser == NULL) {
|
||||
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||
goto _OVER;
|
||||
|
@ -643,7 +643,7 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_USER) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_USER) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -1200,7 +1200,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
|
|||
|
||||
mInfo("vgId:%d, start to redistribute to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER;
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER;
|
||||
|
||||
pVgroup = mndAcquireVgroup(pMnode, req.vgId);
|
||||
if (pVgroup == NULL) goto _OVER;
|
||||
|
@ -1500,7 +1500,7 @@ static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) {
|
|||
pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
||||
if (pDb == NULL) goto _OVER;
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_SPLIT_VGROUP) != 0) goto _OVER;
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_SPLIT_VGROUP) != 0) goto _OVER;
|
||||
|
||||
code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
@ -1624,7 +1624,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
|
|||
|
||||
mDebug("start to balance vgroup");
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_BALANCE_VGROUP) != 0) goto _OVER;
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_BALANCE_VGROUP) != 0) goto _OVER;
|
||||
|
||||
while (1) {
|
||||
SDnodeObj *pDnode = NULL;
|
||||
|
|
|
@ -184,7 +184,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
|
||||
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
||||
if (rsp.code == 0) {
|
||||
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->conn.applyIndex, &rsp) < 0) {
|
||||
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
|
||||
rsp.code = terrno;
|
||||
vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
|
||||
}
|
||||
|
@ -329,7 +329,7 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
|||
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) {
|
||||
SVnode *pVnode = pFsm->data;
|
||||
|
||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
|
||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .info.conn.applyIndex = cbMeta.index};
|
||||
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
|
||||
|
||||
vInfo("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
|
||||
|
@ -359,7 +359,7 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
|
|||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
|
||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
|
||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .info.conn.applyIndex = cbMeta.index};
|
||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
||||
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
|
||||
|
|
|
@ -310,7 +310,7 @@ static void uvHandleReq(SSvrConn* pConn) {
|
|||
}
|
||||
|
||||
// set up conn info
|
||||
SRpcConnInfo* pConnInfo = &(transMsg.info.connInfo);
|
||||
SRpcConnInfo* pConnInfo = &(transMsg.info.conn);
|
||||
pConnInfo->clientIp = (uint32_t)(pConn->addr.sin_addr.s_addr);
|
||||
pConnInfo->clientPort = ntohs(pConn->addr.sin_port);
|
||||
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
|
||||
|
|
Loading…
Reference in New Issue