opt parameter
This commit is contained in:
parent
001834f419
commit
8823358ca7
|
@ -64,7 +64,7 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
|
||||||
SRpcMsg rpcMsg = {.pCont = pHead,
|
SRpcMsg rpcMsg = {.pCont = pHead,
|
||||||
.contLen = contLen,
|
.contLen = contLen,
|
||||||
.msgType = TDMT_MND_RETRIEVE_IP_WHITE,
|
.msgType = TDMT_MND_RETRIEVE_IP_WHITE,
|
||||||
.info.ahandle = (void *)0x9527,
|
.info.ahandle = 0,
|
||||||
.info.refId = 0,
|
.info.refId = 0,
|
||||||
.info.noResp = 0,
|
.info.noResp = 0,
|
||||||
.info.handle = 0};
|
.info.handle = 0};
|
||||||
|
@ -180,7 +180,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
||||||
SRpcMsg rpcMsg = {.pCont = pHead,
|
SRpcMsg rpcMsg = {.pCont = pHead,
|
||||||
.contLen = contLen,
|
.contLen = contLen,
|
||||||
.msgType = TDMT_MND_STATUS,
|
.msgType = TDMT_MND_STATUS,
|
||||||
.info.ahandle = (void *)0x9527,
|
.info.ahandle = 0,
|
||||||
.info.refId = 0,
|
.info.refId = 0,
|
||||||
.info.noResp = 0,
|
.info.noResp = 0,
|
||||||
.info.handle = 0};
|
.info.handle = 0};
|
||||||
|
@ -229,7 +229,7 @@ void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
|
||||||
SRpcMsg rpcMsg = {.pCont = pHead,
|
SRpcMsg rpcMsg = {.pCont = pHead,
|
||||||
.contLen = contLen,
|
.contLen = contLen,
|
||||||
.msgType = TDMT_MND_NOTIFY,
|
.msgType = TDMT_MND_NOTIFY,
|
||||||
.info.ahandle = (void *)0x9527,
|
.info.ahandle = 0,
|
||||||
.info.refId = 0,
|
.info.refId = 0,
|
||||||
.info.noResp = 1,
|
.info.noResp = 1,
|
||||||
.info.handle = 0};
|
.info.handle = 0};
|
||||||
|
|
|
@ -210,8 +210,7 @@ static void mndPullupGrant(SMnode *pMnode) {
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pReq = mndBuildTimerMsg(&contLen);
|
void *pReq = mndBuildTimerMsg(&contLen);
|
||||||
if (pReq != NULL) {
|
if (pReq != NULL) {
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = 0};
|
||||||
.msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
|
|
||||||
// TODO check return value
|
// TODO check return value
|
||||||
(void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
(void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -169,16 +169,15 @@ typedef struct {
|
||||||
|
|
||||||
#define TRANS_VER 2
|
#define TRANS_VER 2
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char version : 4; // RPC version
|
char version : 4; // RPC version
|
||||||
char comp : 2; // compression algorithm, 0:no compression 1:lz4
|
char comp : 2; // compression algorithm, 0:no compression 1:lz4
|
||||||
char noResp : 2; // noResp bits, 0: resp, 1: resp
|
char noResp : 2; // noResp bits, 0: resp, 1: resp
|
||||||
char toInit : 2; // 0: sent user info or not
|
char withUserInfo : 2; // 0: sent user info or not
|
||||||
char secured : 2;
|
char secured : 2;
|
||||||
char spi : 2;
|
char spi : 2;
|
||||||
char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset
|
char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset
|
||||||
|
|
||||||
uint64_t timestamp;
|
uint64_t timestamp;
|
||||||
char user[TSDB_UNI_LEN];
|
|
||||||
int32_t compatibilityVer;
|
int32_t compatibilityVer;
|
||||||
uint32_t magicNum;
|
uint32_t magicNum;
|
||||||
STraceId traceId;
|
STraceId traceId;
|
||||||
|
|
|
@ -100,8 +100,8 @@ typedef struct SCliConn {
|
||||||
int8_t registered;
|
int8_t registered;
|
||||||
int8_t connnected;
|
int8_t connnected;
|
||||||
SHashObj* pQTable;
|
SHashObj* pQTable;
|
||||||
int8_t inited;
|
int8_t userInited;
|
||||||
void* initPacket;
|
void* pInitUserReq;
|
||||||
} SCliConn;
|
} SCliConn;
|
||||||
|
|
||||||
// #define TRANS_CONN_REF_INC(tconn) \
|
// #define TRANS_CONN_REF_INC(tconn) \
|
||||||
|
@ -523,6 +523,10 @@ void cliHandleResp2(SCliConn* conn) {
|
||||||
SCliThrd* pThrd = conn->hostThrd;
|
SCliThrd* pThrd = conn->hostThrd;
|
||||||
STrans* pInst = pThrd->pInst;
|
STrans* pInst = pThrd->pInst;
|
||||||
|
|
||||||
|
if (conn->pInitUserReq) {
|
||||||
|
taosMemoryFree(conn->pInitUserReq);
|
||||||
|
conn->pInitUserReq = NULL;
|
||||||
|
}
|
||||||
cliResetConnTimer(conn);
|
cliResetConnTimer(conn);
|
||||||
SCliReq* pReq = NULL;
|
SCliReq* pReq = NULL;
|
||||||
|
|
||||||
|
@ -1123,8 +1127,8 @@ _exception:
|
||||||
void cliDestroyMsg(void* arg) {
|
void cliDestroyMsg(void* arg) {
|
||||||
queue* e = arg;
|
queue* e = arg;
|
||||||
SCliReq* pReq = QUEUE_DATA(e, SCliReq, q);
|
SCliReq* pReq = QUEUE_DATA(e, SCliReq, q);
|
||||||
if (pReq->msg.info.notFreeAhandle == 0) {
|
if (pReq->msg.info.notFreeAhandle == 0 && pReq->ctx->ahandle != 0) {
|
||||||
taosMemoryFree(pReq->ctx->ahandle);
|
// taosMemoryFree(pReq->ctx->ahandle);
|
||||||
}
|
}
|
||||||
destroyReq(pReq);
|
destroyReq(pReq);
|
||||||
}
|
}
|
||||||
|
@ -1233,7 +1237,11 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
|
||||||
transDQCancel(pThrd->timeoutQueue, conn->task);
|
transDQCancel(pThrd->timeoutQueue, conn->task);
|
||||||
conn->task = NULL;
|
conn->task = NULL;
|
||||||
}
|
}
|
||||||
// cliResetTimer(pThrd, conn);
|
|
||||||
|
if (conn->pInitUserReq) {
|
||||||
|
taosMemoryFree(conn->pInitUserReq);
|
||||||
|
conn->pInitUserReq = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
if (clear) {
|
if (clear) {
|
||||||
if (!uv_is_closing((uv_handle_t*)conn->stream)) {
|
if (!uv_is_closing((uv_handle_t*)conn->stream)) {
|
||||||
|
@ -1270,6 +1278,10 @@ static void cliDestroy(uv_handle_t* handle) {
|
||||||
cliDestroyConnMsgs(conn, true);
|
cliDestroyConnMsgs(conn, true);
|
||||||
destroyCliConnQTable(conn);
|
destroyCliConnQTable(conn);
|
||||||
|
|
||||||
|
if (conn->pInitUserReq) {
|
||||||
|
taosMemoryFree(conn->pInitUserReq);
|
||||||
|
conn->pInitUserReq = NULL;
|
||||||
|
}
|
||||||
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||||
transReqQueueClear(&conn->wreqQueue);
|
transReqQueueClear(&conn->wreqQueue);
|
||||||
(void)transDestroyBuffer(&conn->readBuf);
|
(void)transDestroyBuffer(&conn->readBuf);
|
||||||
|
@ -1352,24 +1364,26 @@ static void cliSendBatch_shareConnCb(uv_write_t* req, int status) {
|
||||||
cliSendBatch_shareConn(conn);
|
cliSendBatch_shareConn(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bool cliConnMayBuildInitPacket(SCliConn* pConn, STransMsgHead** pHead, int32_t msgLen) {
|
bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msgLen) {
|
||||||
SCliThrd* pThrd = pConn->hostThrd;
|
SCliThrd* pThrd = pConn->hostThrd;
|
||||||
STrans* pInst = pThrd->pInst;
|
STrans* pInst = pThrd->pInst;
|
||||||
if (pConn->inited == 1) {
|
if (pConn->userInited == 1) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
STransMsgHead* tHead = taosMemoryCalloc(1, msgLen + strlen(pInst->user));
|
STransMsgHead* pHead = *ppHead;
|
||||||
memcpy(tHead, pHead, TRANS_MSG_OVERHEAD);
|
STransMsgHead* tHead = taosMemoryCalloc(1, *msgLen + sizeof(pInst->user));
|
||||||
memcpy(tHead + TRANS_MSG_OVERHEAD, pInst->user, strlen(pInst->user));
|
memcpy((char*)tHead, (char*)pHead, TRANS_MSG_OVERHEAD);
|
||||||
|
memcpy((char*)tHead + TRANS_MSG_OVERHEAD, pInst->user, sizeof(pInst->user));
|
||||||
|
|
||||||
memcpy(tHead + TRANS_MSG_OVERHEAD + strlen(pInst->user), (char*)pHead + TRANS_MSG_OVERHEAD,
|
memcpy((char*)tHead + TRANS_MSG_OVERHEAD + sizeof(pInst->user), (char*)pHead + TRANS_MSG_OVERHEAD,
|
||||||
msgLen - TRANS_MSG_OVERHEAD);
|
*msgLen - TRANS_MSG_OVERHEAD);
|
||||||
|
|
||||||
tHead->toInit = 1;
|
tHead->withUserInfo = 1;
|
||||||
*pHead = tHead;
|
*ppHead = tHead;
|
||||||
|
*msgLen += sizeof(pInst->user);
|
||||||
|
|
||||||
pConn->initPacket = tHead;
|
pConn->pInitUserReq = tHead;
|
||||||
pConn->inited = 1;
|
pConn->userInited = 1;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
void cliSendBatch_shareConn(SCliConn* pConn) {
|
void cliSendBatch_shareConn(SCliConn* pConn) {
|
||||||
|
@ -1397,20 +1411,20 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
|
||||||
pReq->contLen = 0;
|
pReq->contLen = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int msgLen = transMsgLenFromCont(pReq->contLen);
|
int32_t msgLen = transMsgLenFromCont(pReq->contLen);
|
||||||
|
|
||||||
STransMsgHead* pHead = transHeadFromCont(pReq->pCont);
|
STransMsgHead* pHead = transHeadFromCont(pReq->pCont);
|
||||||
|
|
||||||
if (cliConnMayBuildInitPacket(pConn, &pHead, msgLen)) {
|
char* content = pReq->pCont;
|
||||||
} else {
|
int32_t contLen = pReq->contLen;
|
||||||
pHead->toInit = 0;
|
if (cliConnMayAddUserInfo(pConn, &pHead, &msgLen)) {
|
||||||
|
content = transContFromHead(pHead);
|
||||||
|
contLen = transContLenFromMsg(msgLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHead->comp == 0) {
|
if (pHead->comp == 0) {
|
||||||
pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0;
|
pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0;
|
||||||
pHead->msgType = pReq->msgType;
|
pHead->msgType = pReq->msgType;
|
||||||
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
||||||
// memcpy(pHead->user, pInst->user, strlen(pInst->user));
|
|
||||||
pHead->traceId = pReq->info.traceId;
|
pHead->traceId = pReq->info.traceId;
|
||||||
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
||||||
pHead->version = TRANS_VER;
|
pHead->version = TRANS_VER;
|
||||||
|
@ -1421,8 +1435,8 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
|
||||||
pHead->qid = taosHton64(pReq->info.qId);
|
pHead->qid = taosHton64(pReq->info.qId);
|
||||||
|
|
||||||
if (pHead->comp == 0) {
|
if (pHead->comp == 0) {
|
||||||
if (pInst->compressSize != -1 && pInst->compressSize < pReq->contLen) {
|
if (pInst->compressSize != -1 && pInst->compressSize < contLen) {
|
||||||
msgLen = transCompressMsg(pReq->pCont, pReq->contLen) + sizeof(STransMsgHead);
|
msgLen = transCompressMsg(content, contLen) + sizeof(STransMsgHead);
|
||||||
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -2984,10 +2998,8 @@ int32_t transReleaseCliHandle(void* handle) {
|
||||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||||
}
|
}
|
||||||
|
|
||||||
STransMsg tmsg = {.msgType = TDMT_SCH_TASK_RELEASE,
|
STransMsg tmsg = {
|
||||||
.info.handle = handle,
|
.msgType = TDMT_SCH_TASK_RELEASE, .info.handle = handle, .info.ahandle = (void*)0, .info.qId = (int64_t)handle};
|
||||||
.info.ahandle = (void*)0x9527,
|
|
||||||
.info.qId = (int64_t)handle};
|
|
||||||
|
|
||||||
TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
|
TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
|
||||||
|
|
||||||
|
|
|
@ -54,6 +54,7 @@ typedef struct SSvrConn {
|
||||||
int spi;
|
int spi;
|
||||||
char info[64];
|
char info[64];
|
||||||
char user[TSDB_UNI_LEN]; // user ID for the link
|
char user[TSDB_UNI_LEN]; // user ID for the link
|
||||||
|
int8_t userInited;
|
||||||
char secret[TSDB_PASSWORD_LEN];
|
char secret[TSDB_PASSWORD_LEN];
|
||||||
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
|
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
|
||||||
|
|
||||||
|
@ -463,10 +464,31 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool uvExtractFisrtInitPacket() {
|
bool uvConnMayGetUserInfo(SSvrConn* pConn, STransMsgHead** ppHead, int32_t* msgLen) {
|
||||||
|
if (pConn->userInited) {
|
||||||
return true;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STrans* pInst = pConn->pInst;
|
||||||
|
STransMsgHead* pHead = *ppHead;
|
||||||
|
int32_t len = *msgLen;
|
||||||
|
if (pHead->withUserInfo) {
|
||||||
|
STransMsgHead* tHead = taosMemoryCalloc(1, len - sizeof(pInst->user));
|
||||||
|
memcpy((char*)tHead, (char*)pHead, TRANS_MSG_OVERHEAD);
|
||||||
|
memcpy((char*)tHead + TRANS_MSG_OVERHEAD, (char*)pHead + TRANS_MSG_OVERHEAD + sizeof(pInst->user),
|
||||||
|
len - sizeof(STransMsgHead) - sizeof(pInst->user));
|
||||||
|
tHead->msgLen = htonl(htonl(pHead->msgLen) - sizeof(pInst->user));
|
||||||
|
|
||||||
|
memcpy(pConn->user, (char*)pHead + TRANS_MSG_OVERHEAD, sizeof(pConn->user));
|
||||||
|
pConn->userInited = 1;
|
||||||
|
|
||||||
|
taosMemoryFree(pHead);
|
||||||
|
*ppHead = tHead;
|
||||||
|
*msgLen = len - sizeof(pInst->user);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
static bool uvHandleReq(SSvrConn* pConn) {
|
static bool uvHandleReq(SSvrConn* pConn) {
|
||||||
STrans* pInst = pConn->pInst;
|
STrans* pInst = pConn->pInst;
|
||||||
SWorkThrd* pThrd = pConn->hostThrd;
|
SWorkThrd* pThrd = pConn->hostThrd;
|
||||||
|
@ -479,6 +501,9 @@ static bool uvHandleReq(SSvrConn* pConn) {
|
||||||
tError("%s conn %p read invalid packet", transLabel(pInst), pConn);
|
tError("%s conn %p read invalid packet", transLabel(pInst), pConn);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (uvConnMayGetUserInfo(pConn, &pHead, &msgLen) == true) {
|
||||||
|
}
|
||||||
|
|
||||||
if (resetBuf == 0) {
|
if (resetBuf == 0) {
|
||||||
tTrace("%s conn %p not reset read buf", transLabel(pInst), pConn);
|
tTrace("%s conn %p not reset read buf", transLabel(pInst), pConn);
|
||||||
}
|
}
|
||||||
|
@ -487,12 +512,10 @@ static bool uvHandleReq(SSvrConn* pConn) {
|
||||||
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pInst), pConn);
|
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pInst), pConn);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
// pHead->ahandle = htole64(pHead->ahandle);
|
|
||||||
pHead->code = htonl(pHead->code);
|
pHead->code = htonl(pHead->code);
|
||||||
pHead->msgLen = htonl(pHead->msgLen);
|
pHead->msgLen = htonl(pHead->msgLen);
|
||||||
|
|
||||||
pConn->inType = pHead->msgType;
|
pConn->inType = pHead->msgType;
|
||||||
memcpy(pConn->user, pHead->user, strlen(pHead->user));
|
|
||||||
|
|
||||||
int8_t forbiddenIp = 0;
|
int8_t forbiddenIp = 0;
|
||||||
if (pThrd->enableIpWhiteList && tsEnableWhiteList) {
|
if (pThrd->enableIpWhiteList && tsEnableWhiteList) {
|
||||||
|
@ -697,6 +720,7 @@ static int uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) {
|
||||||
pHead->version = TRANS_VER;
|
pHead->version = TRANS_VER;
|
||||||
pHead->seqNum = htonl(pMsg->info.seqNum);
|
pHead->seqNum = htonl(pMsg->info.seqNum);
|
||||||
pHead->qid = taosHton64(pMsg->info.qId);
|
pHead->qid = taosHton64(pMsg->info.qId);
|
||||||
|
pHead->withUserInfo = pConn->userInited == 0 ? 1 : 0;
|
||||||
|
|
||||||
// handle invalid drop_task resp, TD-20098
|
// handle invalid drop_task resp, TD-20098
|
||||||
if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
|
if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
|
||||||
|
|
Loading…
Reference in New Issue