Merge pull request #12664 from taosdata/fix/dnode
refactor: remove auth func
This commit is contained in:
commit
b99dd9bd90
|
@ -60,9 +60,9 @@ typedef struct {
|
||||||
ReportStartup reportStartupFp;
|
ReportStartup reportStartupFp;
|
||||||
} SMsgCb;
|
} SMsgCb;
|
||||||
|
|
||||||
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
|
void tmsgSetDefault(const SMsgCb* msgcb);
|
||||||
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg);
|
int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg);
|
||||||
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
|
int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype);
|
||||||
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
|
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
|
||||||
void tmsgSendRsp(SRpcMsg* pMsg);
|
void tmsgSendRsp(SRpcMsg* pMsg);
|
||||||
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
|
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
|
||||||
|
|
|
@ -17,46 +17,46 @@
|
||||||
#include "tmsgcb.h"
|
#include "tmsgcb.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
|
||||||
static SMsgCb tsDefaultMsgCb;
|
static SMsgCb defaultMsgCb;
|
||||||
|
|
||||||
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }
|
void tmsgSetDefault(const SMsgCb* msgcb) { defaultMsgCb = *msgcb; }
|
||||||
|
|
||||||
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg) {
|
int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) {
|
||||||
PutToQueueFp fp = pMsgCb->queueFps[qtype];
|
PutToQueueFp fp = msgcb->queueFps[qtype];
|
||||||
return (*fp)(pMsgCb->mgmt, pMsg);
|
return (*fp)(msgcb->mgmt, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
|
int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype) {
|
||||||
GetQueueSizeFp fp = pMsgCb->qsizeFp;
|
GetQueueSizeFp fp = msgcb->qsizeFp;
|
||||||
return (*fp)(pMsgCb->mgmt, vgId, qtype);
|
return (*fp)(msgcb->mgmt, vgId, qtype);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
|
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
|
||||||
SendReqFp fp = tsDefaultMsgCb.sendReqFp;
|
SendReqFp fp = defaultMsgCb.sendReqFp;
|
||||||
return (*fp)(epSet, pMsg);
|
return (*fp)(epSet, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgSendRsp(SRpcMsg* pMsg) {
|
void tmsgSendRsp(SRpcMsg* pMsg) {
|
||||||
SendRspFp fp = tsDefaultMsgCb.sendRspFp;
|
SendRspFp fp = defaultMsgCb.sendRspFp;
|
||||||
return (*fp)(pMsg);
|
return (*fp)(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet) {
|
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet) {
|
||||||
SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp;
|
SendRedirectRspFp fp = defaultMsgCb.sendRedirectRspFp;
|
||||||
(*fp)(pMsg, pNewEpSet);
|
(*fp)(pMsg, pNewEpSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) {
|
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) {
|
||||||
RegisterBrokenLinkArgFp fp = tsDefaultMsgCb.registerBrokenLinkArgFp;
|
RegisterBrokenLinkArgFp fp = defaultMsgCb.registerBrokenLinkArgFp;
|
||||||
(*fp)(pMsg);
|
(*fp)(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) {
|
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) {
|
||||||
ReleaseHandleFp fp = tsDefaultMsgCb.releaseHandleFp;
|
ReleaseHandleFp fp = defaultMsgCb.releaseHandleFp;
|
||||||
(*fp)(pHandle, type);
|
(*fp)(pHandle, type);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgReportStartup(const char* name, const char* desc) {
|
void tmsgReportStartup(const char* name, const char* desc) {
|
||||||
ReportStartup fp = tsDefaultMsgCb.reportStartupFp;
|
ReportStartup fp = defaultMsgCb.reportStartupFp;
|
||||||
(*fp)(name, desc);
|
(*fp)(name, desc);
|
||||||
}
|
}
|
|
@ -275,7 +275,7 @@ static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus)
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) {
|
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
dDebug("start to process net test req");
|
dDebug("msg:%p, net test req will be processed", pMsg);
|
||||||
SRpcMsg rsp = {.code = 0, .info = pMsg->info};
|
SRpcMsg rsp = {.code = 0, .info = pMsg->info};
|
||||||
rsp.pCont = rpcMallocCont(pMsg->contLen);
|
rsp.pCont = rpcMallocCont(pMsg->contLen);
|
||||||
if (rsp.pCont == NULL) {
|
if (rsp.pCont == NULL) {
|
||||||
|
@ -287,8 +287,7 @@ void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
|
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
dDebug("start to process server startup status req");
|
dDebug("msg:%p, server startup status req will be processed", pMsg);
|
||||||
|
|
||||||
SServerStatusRsp statusRsp = {0};
|
SServerStatusRsp statusRsp = {0};
|
||||||
dmGetServerStartupStatus(pDnode, &statusRsp);
|
dmGetServerStartupStatus(pDnode, &statusRsp);
|
||||||
|
|
||||||
|
|
|
@ -78,7 +78,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
|
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
|
||||||
|
|
||||||
if (pWrapper->ntype == DNODE || InChildProc(pWrapper)) {
|
if (pWrapper->ntype == DNODE || InChildProc(pWrapper)) {
|
||||||
tmsgSetDefaultMsgCb(&input.msgCb);
|
tmsgSetDefault(&input.msgCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OnlyInSingleProc(pWrapper)) {
|
if (OnlyInSingleProc(pWrapper)) {
|
||||||
|
|
|
@ -344,66 +344,6 @@ void dmCleanupClient(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int32_t dmGetHideUserAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
|
|
||||||
int32_t code = 0;
|
|
||||||
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
|
||||||
|
|
||||||
if (strcmp(user, INTERNAL_USER) == 0) {
|
|
||||||
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
|
|
||||||
} else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
|
|
||||||
taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
|
|
||||||
} else {
|
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (code == 0) {
|
|
||||||
memcpy(secret, pass, TSDB_PASSWORD_LEN);
|
|
||||||
*spi = 1;
|
|
||||||
*encrypt = 0;
|
|
||||||
*ckey = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
|
|
||||||
char *ckey) {
|
|
||||||
if (dmGetHideUserAuth(user, spi, encrypt, secret, ckey) == 0) {
|
|
||||||
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
SAuthReq authReq = {0};
|
|
||||||
tstrncpy(authReq.user, user, TSDB_USER_LEN);
|
|
||||||
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
|
|
||||||
void *pReq = rpcMallocCont(contLen);
|
|
||||||
tSerializeSAuthReq(pReq, contLen, &authReq);
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .info.ahandle = (void *)9528};
|
|
||||||
SRpcMsg rpcRsp = {0};
|
|
||||||
SEpSet epSet = {0};
|
|
||||||
dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
|
|
||||||
dmGetMnodeEpSet(&pDnode->data, &epSet);
|
|
||||||
dmSendRecv(&epSet, &rpcMsg, &rpcRsp);
|
|
||||||
|
|
||||||
if (rpcRsp.code != 0) {
|
|
||||||
terrno = rpcRsp.code;
|
|
||||||
dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
|
|
||||||
} else {
|
|
||||||
SAuthRsp authRsp = {0};
|
|
||||||
tDeserializeSAuthReq(rpcRsp.pCont, rpcRsp.contLen, &authRsp);
|
|
||||||
memcpy(secret, authRsp.secret, TSDB_PASSWORD_LEN);
|
|
||||||
memcpy(ckey, authRsp.ckey, TSDB_PASSWORD_LEN);
|
|
||||||
*spi = authRsp.spi;
|
|
||||||
*encrypt = authRsp.encrypt;
|
|
||||||
dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, authRsp.spi,
|
|
||||||
authRsp.encrypt);
|
|
||||||
}
|
|
||||||
|
|
||||||
rpcFreeCont(rpcRsp.pCont);
|
|
||||||
return rpcRsp.code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t dmInitServer(SDnode *pDnode) {
|
int32_t dmInitServer(SDnode *pDnode) {
|
||||||
SDnodeTrans *pTrans = &pDnode->trans;
|
SDnodeTrans *pTrans = &pDnode->trans;
|
||||||
|
|
||||||
|
@ -416,7 +356,6 @@ int32_t dmInitServer(SDnode *pDnode) {
|
||||||
rpcInit.sessions = tsMaxShellConns;
|
rpcInit.sessions = tsMaxShellConns;
|
||||||
rpcInit.connType = TAOS_CONN_SERVER;
|
rpcInit.connType = TAOS_CONN_SERVER;
|
||||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
rpcInit.afp = (RpcAfp)dmRetrieveUserAuthInfo;
|
|
||||||
rpcInit.parent = pDnode;
|
rpcInit.parent = pDnode;
|
||||||
|
|
||||||
pTrans->serverRpc = rpcOpen(&rpcInit);
|
pTrans->serverRpc = rpcOpen(&rpcInit);
|
||||||
|
|
|
@ -197,8 +197,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
||||||
goto CONN_OVER;
|
goto CONN_OVER;
|
||||||
}
|
}
|
||||||
if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
|
if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
|
||||||
mError("user:%s, failed to auth while acquire user, input:%s saved:%s", pReq->conn.user, connReq.passwd,
|
mError("user:%s, failed to auth while acquire user, input:%s", pReq->conn.user, connReq.passwd);
|
||||||
pUser->pass);
|
|
||||||
code = TSDB_CODE_RPC_AUTH_FAILURE;
|
code = TSDB_CODE_RPC_AUTH_FAILURE;
|
||||||
goto CONN_OVER;
|
goto CONN_OVER;
|
||||||
}
|
}
|
||||||
|
|
|
@ -343,19 +343,20 @@ void mndStop(SMnode *pMnode) { return mndCleanupTimer(pMnode); }
|
||||||
int32_t mndProcessMsg(SRpcMsg *pMsg) {
|
int32_t mndProcessMsg(SRpcMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
void *ahandle = pMsg->info.ahandle;
|
void *ahandle = pMsg->info.ahandle;
|
||||||
|
|
||||||
mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle);
|
mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle);
|
||||||
|
|
||||||
if (IsReq(pMsg) && !mndIsMaster(pMnode)) {
|
if (IsReq(pMsg)) {
|
||||||
terrno = TSDB_CODE_APP_NOT_READY;
|
if (!mndIsMaster(pMnode)) {
|
||||||
mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
|
terrno = TSDB_CODE_APP_NOT_READY;
|
||||||
return -1;
|
mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
|
||||||
}
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (IsReq(pMsg) && (pMsg->contLen == 0 || pMsg->pCont == NULL)) {
|
if (pMsg->contLen == 0 || pMsg->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
||||||
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
|
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
|
||||||
return -1;
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
|
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
|
||||||
|
|
|
@ -56,7 +56,7 @@ class MndTestTrans2 : public ::testing::Test {
|
||||||
msgCb.sendReqFp = sendReq;
|
msgCb.sendReqFp = sendReq;
|
||||||
msgCb.sendRspFp = sendRsp;
|
msgCb.sendRspFp = sendRsp;
|
||||||
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
|
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
|
||||||
tmsgSetDefaultMsgCb(&msgCb);
|
tmsgSetDefault(&msgCb);
|
||||||
|
|
||||||
SMnodeOpt opt = {0};
|
SMnodeOpt opt = {0};
|
||||||
opt.deploy = 1;
|
opt.deploy = 1;
|
||||||
|
|
Loading…
Reference in New Issue