Merge pull request #29003 from taosdata/enh/TD-31388

check msg len before send req
This commit is contained in:
Shengliang Guan 2024-12-03 13:37:12 +08:00 committed by GitHub
commit 68dca47759
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 19 additions and 3 deletions

View File

@ -97,6 +97,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_RPC_ASYNC_IN_PROCESS TAOS_DEF_ERROR_CODE(0, 0x0028) #define TSDB_CODE_RPC_ASYNC_IN_PROCESS TAOS_DEF_ERROR_CODE(0, 0x0028)
#define TSDB_CODE_RPC_NO_STATE TAOS_DEF_ERROR_CODE(0, 0x0029) #define TSDB_CODE_RPC_NO_STATE TAOS_DEF_ERROR_CODE(0, 0x0029)
#define TSDB_CODE_RPC_STATE_DROPED TAOS_DEF_ERROR_CODE(0, 0x002A) #define TSDB_CODE_RPC_STATE_DROPED TAOS_DEF_ERROR_CODE(0, 0x002A)
#define TSDB_CODE_RPC_MSG_EXCCED_LIMIT TAOS_DEF_ERROR_CODE(0, 0x002B)
//common & util //common & util
#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) // #define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) //

View File

@ -99,6 +99,7 @@ typedef void* queue[2];
#define TRANS_CONN_TIMEOUT 5000 // connect timeout (ms) #define TRANS_CONN_TIMEOUT 5000 // connect timeout (ms)
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 #define TRANS_PACKET_LIMIT 1024 * 1024 * 512
#define TRANS_MSG_LIMIT (TRANS_PACKET_LIMIT - sizeof(STransMsgHead))
#define TRANS_MAGIC_NUM 0x5f375a86 #define TRANS_MAGIC_NUM 0x5f375a86
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) #define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)

View File

@ -377,6 +377,12 @@ static FORCE_INLINE void logConnMissHit(SCliConn* pConn);
static void* cliWorkThread(void* arg); static void* cliWorkThread(void* arg);
static bool isReqExceedLimit(STransMsg* pMsg) {
if (pMsg != NULL && pMsg->contLen >= TRANS_MSG_LIMIT) {
return true;
}
return false;
}
int32_t cliGetConnTimer(SCliThrd* pThrd, SCliConn* pConn) { int32_t cliGetConnTimer(SCliThrd* pThrd, SCliConn* pConn) {
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
if (timer == NULL) { if (timer == NULL) {
@ -3209,6 +3215,10 @@ _exception:
} }
int32_t transSendRequest(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { int32_t transSendRequest(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
if (isReqExceedLimit(pReq)) {
return TSDB_CODE_RPC_MSG_EXCCED_LIMIT;
}
STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)pInstRef); STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)pInstRef);
if (pInst == NULL) { if (pInst == NULL) {
transFreeMsg(pReq->pCont); transFreeMsg(pReq->pCont);
@ -3236,9 +3246,6 @@ int32_t transSendRequest(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq,
return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
} }
// if (pReq->msgType == TDMT_SCH_DROP_TASK) {
// TAOS_UNUSED(transReleaseCliHandle(pReq->info.handle));
// }
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
return 0; return 0;
@ -3255,6 +3262,9 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg*
if (transpointId == NULL) { if (transpointId == NULL) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
if (isReqExceedLimit(pReq)) {
return TSDB_CODE_RPC_MSG_EXCCED_LIMIT;
}
int32_t code = 0; int32_t code = 0;
int8_t transIdInited = 0; int8_t transIdInited = 0;
@ -3306,6 +3316,9 @@ _exception:
} }
int32_t transSendRecv(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { int32_t transSendRecv(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
if (isReqExceedLimit(pReq)) {
return TSDB_CODE_RPC_MSG_EXCCED_LIMIT;
}
STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)pInstRef); STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)pInstRef);
if (pInst == NULL) { if (pInst == NULL) {
transFreeMsg(pReq->pCont); transFreeMsg(pReq->pCont);

View File

@ -63,6 +63,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module alre
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_IN_PROCESS, "rpc async in process") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_IN_PROCESS, "rpc async in process")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NO_STATE, "rpc no state") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NO_STATE, "rpc no state")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_STATE_DROPED, "rpc state already dropped") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_STATE_DROPED, "rpc state already dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MSG_EXCCED_LIMIT, "rpc msg exceed limit")
//common & util //common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized") TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")