[TD-1827]<enhance>: force version check for client messages

This commit is contained in:
stephenkgu 2020-12-12 09:19:15 +08:00
parent 6895dae5fb
commit 9e5f835ac7
7 changed files with 19 additions and 10 deletions

View File

@ -214,7 +214,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
STscObj* pObj = pSql->pTscObj; STscObj* pObj = pSql->pTscObj;
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
char *pMsg = rpcMallocCont(pCmd->payloadLen); char *pMsg = rpcMallocCont(sizeof(SMsgVersion) + pCmd->payloadLen);
if (NULL == pMsg) { if (NULL == pMsg) {
tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]); tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
@ -225,7 +225,8 @@ int tscSendMsgToServer(SSqlObj *pSql) {
tscDumpMgmtEpSet(pSql); tscDumpMgmtEpSet(pSql);
} }
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); tstrncpy(pMsg, version, sizeof(SMsgVersion));
memcpy(pMsg + sizeof(SMsgVersion), pSql->cmd.payload, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType, .msgType = pSql->cmd.msgType,

View File

@ -124,8 +124,6 @@ void dnodeDispatchToMReadQueue(SRpcMsg *pMsg) {
SMnodeMsg *pRead = mnodeCreateMsg(pMsg); SMnodeMsg *pRead = mnodeCreateMsg(pMsg);
taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead); taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead);
} }
rpcFreeCont(pMsg->pCont);
} }
static void dnodeFreeMReadMsg(SMnodeMsg *pRead) { static void dnodeFreeMReadMsg(SMnodeMsg *pRead) {

View File

@ -125,8 +125,6 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
} }
rpcFreeCont(pMsg->pCont);
} }
static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) { static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) {

View File

@ -127,7 +127,18 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
} else {} } else {}
if ( dnodeProcessShellMsgFp[pMsg->msgType] ) { if ( dnodeProcessShellMsgFp[pMsg->msgType] ) {
SMsgVersion *pMsgVersion = pMsg->pCont;
if (taosCheckVersion(pMsgVersion->clientVersion, version, 3) != TSDB_CODE_SUCCESS) {
rpcMsg.code = TSDB_CODE_TSC_INVALID_VERSION;
rpcSendResponse(&rpcMsg);
rpcFreeCont(pMsg->pCont);
return; // todo change the error code
}
pMsg->pCont += sizeof(*pMsgVersion);
(*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg); (*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg);
rpcFreeCont(pMsg->pCont - sizeof(*pMsgVersion));
} else { } else {
dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]);
rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
@ -231,4 +242,4 @@ SStatisInfo dnodeGetStatisInfo() {
} }
return info; return info;
} }

View File

@ -77,8 +77,6 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
rpcFreeCont(pMsg->pCont);
} }
void *dnodeAllocVQueryQueue(void *pVnode) { void *dnodeAllocVQueryQueue(void *pVnode) {

View File

@ -102,7 +102,6 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) {
} }
vnodeRelease(pVnode); vnodeRelease(pVnode);
rpcFreeCont(pRpcMsg->pCont);
} }
void *dnodeAllocVWriteQueue(void *pVnode) { void *dnodeAllocVWriteQueue(void *pVnode) {

View File

@ -198,6 +198,10 @@ typedef struct {
int32_t numOfVnodes; int32_t numOfVnodes;
} SMsgDesc; } SMsgDesc;
typedef struct SMsgVersion {
char clientVersion[TSDB_VERSION_LEN];
} SMsgVersion;
typedef struct SMsgHead { typedef struct SMsgHead {
int32_t contLen; int32_t contLen;
int32_t vgId; int32_t vgId;