enh(rpc):add auth

This commit is contained in:
yihaoDeng 2022-04-21 17:51:33 +08:00
parent 676a9b87f1
commit 3278a2309b
5 changed files with 36 additions and 19 deletions

View File

@ -326,6 +326,13 @@ int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp); int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void* taosDecodeSEpSet(const void* buf, SEpSet* pEp); void* taosDecodeSEpSet(const void* buf, SEpSet* pEp);
typedef struct {
SEpSet epSet;
} SMEpSet;
int32_t tSerializeSMEpSet(void* buf, int32_t bufLen, SMEpSet* pReq);
int32_t tDeserializeSMEpSet(void* buf, int32_t buflen, SMEpSet* pReq);
typedef struct { typedef struct {
int8_t connType; int8_t connType;
int32_t pid; int32_t pid;
@ -2690,6 +2697,7 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
} }
return buf; return buf;
} }
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -828,6 +828,28 @@ void tFreeSMAltertbReq(SMAltertbReq *pReq) {
taosArrayDestroy(pReq->pFields); taosArrayDestroy(pReq->pFields);
pReq->pFields = NULL; pReq->pFields = NULL;
} }
int32_t tSerializeSMEpSet(void *buf, int32_t bufLen, SMEpSet *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tEncodeSEpSet(&encoder, &pReq->epSet) < 0) {
return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMEpSet(void *buf, int32_t bufLen, SMEpSet *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tDecodeSEpSet(&decoder, &pReq->epSet) < 0) {
return -1;
}
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq) { int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq) {
SCoder encoder = {0}; SCoder encoder = {0};

View File

@ -100,11 +100,10 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
memset(&rpcMsg, 0, sizeof(rpcMsg)); memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.contLen = sizeof(SEpSet); SMEpSet msg = {.epSet = *pEpSet};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
if (rpcMsg.pCont == NULL) return; rpcMsg.pCont = rpcMallocCont(len);
tSerializeSMEpSet(rpcMsg.pCont, len, &msg);
memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet));
rpcMsg.code = TSDB_CODE_RPC_REDIRECT; rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
rpcMsg.handle = thandle; rpcMsg.handle = thandle;

View File

@ -33,10 +33,6 @@ typedef struct SCliConn {
bool broken; // link broken or not bool broken; // link broken or not
ConnStatus status; // ConnStatus status; //
int release; // 1: release
// spi configure
char spi;
char secured;
char* ip; char* ip;
uint32_t port; uint32_t port;
@ -44,7 +40,6 @@ typedef struct SCliConn {
// debug and log info // debug and log info
struct sockaddr_in addr; struct sockaddr_in addr;
struct sockaddr_in locaddr; struct sockaddr_in locaddr;
} SCliConn; } SCliConn;
typedef struct SCliMsg { typedef struct SCliMsg {
@ -303,8 +298,6 @@ void cliHandleResp(SCliConn* conn) {
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
conn->secured = pHead->secured;
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
tTrace("except, server continue send while cli ignore it"); tTrace("except, server continue send while cli ignore it");
// transUnrefCliHandle(conn); // transUnrefCliHandle(conn);

View File

@ -30,7 +30,6 @@ typedef struct SSrvConn {
uv_timer_t pTimer; uv_timer_t pTimer;
queue queue; queue queue;
int ref;
int persist; // persist connection or not int persist; // persist connection or not
SConnBuffer readBuf; // read buf, SConnBuffer readBuf; // read buf,
int inType; int inType;
@ -692,8 +691,6 @@ static void uvDestroyConn(uv_handle_t* handle) {
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
tTrace("work thread quit"); tTrace("work thread quit");
uv_walk(thrd->loop, uvWalkCb, NULL); uv_walk(thrd->loop, uvWalkCb, NULL);
// uv_loop_close(thrd->loop);
// uv_stop(thrd->loop);
} }
} }
@ -756,8 +753,6 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
thrd->quit = true; thrd->quit = true;
if (QUEUE_IS_EMPTY(&thrd->conn)) { if (QUEUE_IS_EMPTY(&thrd->conn)) {
uv_walk(thrd->loop, uvWalkCb, NULL); uv_walk(thrd->loop, uvWalkCb, NULL);
// uv_loop_close(thrd->loop);
// uv_stop(thrd->loop);
} else { } else {
destroyAllConn(thrd); destroyAllConn(thrd);
} }