From 79f01ad65578be2694f1309b80fc98b4b7cb80ec Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 5 Jul 2023 08:16:25 +0000 Subject: [PATCH 1/8] add version check in rpc --- include/libs/transport/trpc.h | 2 + source/client/src/clientEnv.c | 45 ++++++++-------- source/client/src/clientImpl.c | 36 ++++++------- source/client/src/clientMain.c | 54 +++++++++---------- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 10 +++- source/libs/function/src/udfd.c | 3 +- .../libs/sync/test/sync_test_lib/src/syncIO.c | 26 ++++----- source/libs/transport/inc/transComm.h | 2 + source/libs/transport/inc/transportInt.h | 6 +-- source/libs/transport/src/trans.c | 1 + source/libs/transport/src/transCli.c | 11 ++-- source/libs/transport/src/transComm.c | 2 +- source/libs/transport/src/transSvr.c | 11 ++-- source/libs/transport/test/cliBench.c | 3 +- source/libs/transport/test/svrBench.c | 7 ++- tools/shell/src/shellNettest.c | 4 ++ 16 files changed, 123 insertions(+), 100 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index c73e5c127a..93e4d72ad7 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -46,6 +46,7 @@ typedef struct SRpcHandleInfo { int8_t noResp; // has response or not(default 0, 0: resp, 1: no resp) int8_t persistHandle; // persist handle or not int8_t hasEpSet; + int32_t cliVer; // app info void *ahandle; // app handle set by client @@ -83,6 +84,7 @@ typedef struct SRpcInit { int32_t sessions; // number of sessions allowed int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int32_t idleTime; // milliseconds, 0 means idle timer is disabled + int32_t compatibilityVer; int32_t retryMinInterval; // retry init interval int32_t retryStepFactor; // retry interval factor diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index c64bbfbdb6..238b3613f5 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -29,6 +29,7 @@ #include "trpc.h" #include "tsched.h" #include "ttime.h" +#include "tversion.h" #if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL) #include "cus_name.h" @@ -111,7 +112,8 @@ static void deregisterRequest(SRequestObj *pRequest) { atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1); if (tsSlowLogScope & reqType) { taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 ", Duration:%" PRId64 "us, SQL:%s", - taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration, pRequest->sqlstr); + taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration, + pRequest->sqlstr); } } @@ -175,6 +177,8 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.connLimitNum = connLimitNum; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); + void *pDnodeConn = rpcOpen(&rpcInit); if (pDnodeConn == NULL) { tscError("failed to init connection to server"); @@ -358,17 +362,16 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); } - void destroySubRequests(SRequestObj *pRequest) { - int32_t reqIdx = -1; + int32_t reqIdx = -1; SRequestObj *pReqList[16] = {NULL}; - uint64_t tmpRefId = 0; + uint64_t tmpRefId = 0; if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) { return; } - - SRequestObj* pTmp = pRequest; + + SRequestObj *pTmp = pRequest; while (pTmp->relation.prevRefId) { tmpRefId = pTmp->relation.prevRefId; pTmp = acquireRequest(tmpRefId); @@ -376,9 +379,9 @@ void destroySubRequests(SRequestObj *pRequest) { pReqList[++reqIdx] = pTmp; releaseRequest(tmpRefId); } else { - tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, - tmpRefId, pTmp->requestId); - break; + tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId, + pTmp->requestId); + break; } } @@ -391,16 +394,15 @@ void destroySubRequests(SRequestObj *pRequest) { pTmp = acquireRequest(tmpRefId); if (pTmp) { tmpRefId = pTmp->relation.nextRefId; - removeRequest(pTmp->self); + removeRequest(pTmp->self); releaseRequest(pTmp->self); } else { tscError("0x%" PRIx64 " is not there", tmpRefId); - break; + break; } } } - void doDestroyRequest(void *p) { if (NULL == p) { return; @@ -412,7 +414,7 @@ void doDestroyRequest(void *p) { tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); destroySubRequests(pRequest); - + taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); schedulerFreeJob(&pRequest->body.queryJob, 0); @@ -473,15 +475,15 @@ void taosStopQueryImpl(SRequestObj *pRequest) { } void stopAllQueries(SRequestObj *pRequest) { - int32_t reqIdx = -1; + int32_t reqIdx = -1; SRequestObj *pReqList[16] = {NULL}; - uint64_t tmpRefId = 0; + uint64_t tmpRefId = 0; if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) { return; } - - SRequestObj* pTmp = pRequest; + + SRequestObj *pTmp = pRequest; while (pTmp->relation.prevRefId) { tmpRefId = pTmp->relation.prevRefId; pTmp = acquireRequest(tmpRefId); @@ -489,9 +491,9 @@ void stopAllQueries(SRequestObj *pRequest) { pReqList[++reqIdx] = pTmp; releaseRequest(tmpRefId); } else { - tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, - tmpRefId, pTmp->requestId); - break; + tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId, + pTmp->requestId); + break; } } @@ -510,12 +512,11 @@ void stopAllQueries(SRequestObj *pRequest) { releaseRequest(pTmp->self); } else { tscError("0x%" PRIx64 " is not there", tmpRefId); - break; + break; } } } - void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); } static void *tscCrashReportThreadFp(void *param) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 955c90fc81..14d6394fc4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -26,7 +26,7 @@ #include "tpagedbuf.h" #include "tref.h" #include "tsched.h" - +#include "tversion.h" static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); @@ -237,8 +237,9 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, return TSDB_CODE_SUCCESS; } -int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest) { - int32_t code = buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0); +int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) { + int32_t code = + buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0); if (TSDB_CODE_SUCCESS == code) { pRequest->relation.prevRefId = (*pNewRequest)->self; (*pNewRequest)->relation.nextRefId = pRequest->self; @@ -502,8 +503,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t pResInfo->userFields[i].bytes = pSchema[i].bytes; pResInfo->userFields[i].type = pSchema[i].type; - if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR || - pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) { + if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR || pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) { pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE; } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) { pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; @@ -891,7 +891,7 @@ static bool incompletaFileParsing(SNode* pStmt) { void continuePostSubQuery(SRequestObj* pRequest, TAOS_ROW row) { SSqlCallbackWrapper* pWrapper = pRequest->pWrapper; - int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId); + int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId); if (TSDB_CODE_SUCCESS == code) { int64_t analyseStart = taosGetTimestampUs(); code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, (void**)row); @@ -934,7 +934,7 @@ void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) { TAOS_ROW row = NULL; if (rowNum > 0) { - row = taos_fetch_row(res); // for single row only now + row = taos_fetch_row(res); // for single row only now } SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId); @@ -2135,6 +2135,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); clientRpc = rpcOpen(&rpcInit); if (clientRpc == NULL) { @@ -2494,11 +2495,10 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, return pRequest; } +static void fetchCallback(void* pResult, void* param, int32_t code) { + SRequestObj* pRequest = (SRequestObj*)param; -static void fetchCallback(void *pResult, void *param, int32_t code) { - SRequestObj *pRequest = (SRequestObj *)param; - - SReqResultInfo *pResultInfo = &pRequest->body.resInfo; + SReqResultInfo* pResultInfo = &pRequest->body.resInfo; tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); @@ -2520,7 +2520,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) { } pRequest->code = - setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, true); + setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, true); if (pRequest->code != TSDB_CODE_SUCCESS) { pResultInfo->numOfRows = 0; pRequest->code = code; @@ -2531,19 +2531,19 @@ static void fetchCallback(void *pResult, void *param, int32_t code) { pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId); - STscObj *pTscObj = pRequest->pTscObj; - SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; - atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); + STscObj* pTscObj = pRequest->pTscObj; + SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary; + atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); } pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); } -void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param) { +void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) { pRequest->body.fetchFp = fp; pRequest->body.param = param; - SReqResultInfo *pResultInfo = &pRequest->body.resInfo; + SReqResultInfo* pResultInfo = &pRequest->body.resInfo; // this query has no results or error exists, return directly if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) { @@ -2578,5 +2578,3 @@ void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param schedulerFetchRows(pRequest->body.queryJob, &req); } - - diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 7ce838cd2c..e262ee04b9 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -558,13 +558,12 @@ int taos_select_db(TAOS *taos, const char *db) { return code; } - void taos_stop_query(TAOS_RES *res) { if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { return; } - stopAllQueries((SRequestObj*)res); + stopAllQueries((SRequestObj *)res); } bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { @@ -785,7 +784,7 @@ void destorySqlCallbackWrapper(SSqlCallbackWrapper *pWrapper) { taosMemoryFree(pWrapper); } -void destroyCtxInRequest(SRequestObj* pRequest) { +void destroyCtxInRequest(SRequestObj *pRequest) { schedulerFreeJob(&pRequest->body.queryJob, 0); qDestroyQuery(pRequest->pQuery); pRequest->pQuery = NULL; @@ -793,7 +792,6 @@ void destroyCtxInRequest(SRequestObj* pRequest) { pRequest->pWrapper = NULL; } - static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t code) { SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param; SRequestObj *pRequest = pWrapper->pRequest; @@ -807,15 +805,15 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t if (TSDB_CODE_SUCCESS == code) { code = qAnalyseSqlSemantic(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery); } - + pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart; - + handleQueryAnslyseRes(pWrapper, pResultMeta, code); } -int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) { - int32_t code = TSDB_CODE_SUCCESS; - SCatalogReq* pTarget = taosMemoryCalloc(1, sizeof(SCatalogReq)); +int32_t cloneCatalogReq(SCatalogReq **ppTarget, SCatalogReq *pSrc) { + int32_t code = TSDB_CODE_SUCCESS; + SCatalogReq *pTarget = taosMemoryCalloc(1, sizeof(SCatalogReq)); if (pTarget == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; } else { @@ -842,17 +840,16 @@ int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) { return code; } - -void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode* pRoot) { - SRequestObj* pNewRequest = NULL; - SSqlCallbackWrapper* pNewWrapper = NULL; - int32_t code = buildPreviousRequest(pWrapper->pRequest, pWrapper->pRequest->sqlstr, &pNewRequest); +void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode *pRoot) { + SRequestObj *pNewRequest = NULL; + SSqlCallbackWrapper *pNewWrapper = NULL; + int32_t code = buildPreviousRequest(pWrapper->pRequest, pWrapper->pRequest->sqlstr, &pNewRequest); if (code) { handleQueryAnslyseRes(pWrapper, pResultMeta, code); return; } - pNewRequest->pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); + pNewRequest->pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY); if (NULL == pNewRequest->pQuery) { code = TSDB_CODE_OUT_OF_MEMORY; } else { @@ -871,16 +868,16 @@ void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResult } void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code) { - SRequestObj *pRequest = pWrapper->pRequest; - SQuery *pQuery = pRequest->pQuery; + SRequestObj *pRequest = pWrapper->pRequest; + SQuery *pQuery = pRequest->pQuery; if (code == TSDB_CODE_SUCCESS && pQuery->pPrevRoot) { - SNode* prevRoot = pQuery->pPrevRoot; + SNode *prevRoot = pQuery->pPrevRoot; pQuery->pPrevRoot = NULL; handleSubQueryFromAnalyse(pWrapper, pResultMeta, prevRoot); return; } - + if (code == TSDB_CODE_SUCCESS) { pRequest->stableQuery = pQuery->stableQuery; if (pQuery->pRoot) { @@ -1043,7 +1040,7 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { } int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; STscObj *pTscObj = pRequest->pTscObj; SSqlCallbackWrapper *pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper)); if (pWrapper == NULL) { @@ -1081,7 +1078,6 @@ int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *p return code; } - void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { SSqlCallbackWrapper *pWrapper = NULL; int32_t code = TSDB_CODE_SUCCESS; @@ -1128,12 +1124,12 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { } void restartAsyncQuery(SRequestObj *pRequest, int32_t code) { - int32_t reqIdx = 0; + int32_t reqIdx = 0; SRequestObj *pReqList[16] = {NULL}; SRequestObj *pUserReq = NULL; pReqList[0] = pRequest; - uint64_t tmpRefId = 0; - SRequestObj* pTmp = pRequest; + uint64_t tmpRefId = 0; + SRequestObj *pTmp = pRequest; while (pTmp->relation.prevRefId) { tmpRefId = pTmp->relation.prevRefId; pTmp = acquireRequest(tmpRefId); @@ -1141,9 +1137,9 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) { pReqList[++reqIdx] = pTmp; releaseRequest(tmpRefId); } else { - tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, - tmpRefId, pTmp->requestId); - break; + tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId, + pTmp->requestId); + break; } } @@ -1152,11 +1148,11 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) { pTmp = acquireRequest(tmpRefId); if (pTmp) { tmpRefId = pTmp->relation.nextRefId; - removeRequest(pTmp->self); + removeRequest(pTmp->self); releaseRequest(pTmp->self); } else { tscError("0x%" PRIx64 " is not there", tmpRefId); - break; + break; } } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index ea46b70693..e6c2fe114f 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" #include "qworker.h" +#include "tversion.h" static inline void dmSendRsp(SRpcMsg *pMsg) { rpcSendResponse(pMsg); } @@ -73,6 +74,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType), pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId); + int32_t svrVer = 0; + taosVersionStrToInt(version, &svrVer); + if (0 != taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) { + goto _OVER; + } + switch (pRpc->msgType) { case TDMT_DND_NET_TEST: dmProcessNetTestReq(pDnode, pRpc); @@ -305,6 +312,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.supportBatch = 1; rpcInit.batchSize = 8 * 1024; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { @@ -339,7 +347,7 @@ int32_t dmInitServer(SDnode *pDnode) { rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.parent = pDnode; rpcInit.compressSize = tsCompressMsgSize; - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); pTrans->serverRpc = rpcOpen(&rpcInit); if (pTrans->serverRpc == NULL) { dError("failed to init dnode rpc server"); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 3cc7c052cc..7371017111 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -29,6 +29,7 @@ #include "tmsg.h" #include "trpc.h" #include "tmisce.h" +#include "tversion.h" // clang-format on #define UDFD_MAX_SCRIPT_PLUGINS 64 @@ -1038,7 +1039,7 @@ int32_t udfdOpenClientRpc() { connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); global.clientRpc = rpcOpen(&rpcInit); if (global.clientRpc == NULL) { fnError("failed to init dnode rpc client"); diff --git a/source/libs/sync/test/sync_test_lib/src/syncIO.c b/source/libs/sync/test/sync_test_lib/src/syncIO.c index 2e00785586..4f8ae59348 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncIO.c +++ b/source/libs/sync/test/sync_test_lib/src/syncIO.c @@ -21,6 +21,7 @@ #include "tglobal.h" #include "ttimer.h" #include "tutil.h" +#include "tversion.h" bool gRaftDetailLog = false; SSyncIO *gSyncIO = NULL; @@ -188,7 +189,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) { rpcInit.idleTime = 100; rpcInit.user = "sync-io"; rpcInit.connType = TAOS_CONN_CLIENT; - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); io->clientRpc = rpcOpen(&rpcInit); if (io->clientRpc == NULL) { sError("failed to initialize RPC"); @@ -209,7 +210,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) { rpcInit.idleTime = 2 * 1500; rpcInit.parent = io; rpcInit.connType = TAOS_CONN_SERVER; - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); void *pRpc = rpcOpen(&rpcInit); if (pRpc == NULL) { sError("failed to start RPC server"); @@ -470,11 +471,10 @@ static void syncIOTickPing(void *param, void *tmrId) { taosTmrReset(syncIOTickPing, io->pingTimerMS, io, io->timerMgr, &io->pingTimer); } -void syncEntryDestory(SSyncRaftEntry* pEntry) {} +void syncEntryDestory(SSyncRaftEntry *pEntry) {} - -void syncUtilMsgNtoH(void* msg) { - SMsgHead* pHead = msg; +void syncUtilMsgNtoH(void *msg) { + SMsgHead *pHead = msg; pHead->contLen = ntohl(pHead->contLen); pHead->vgId = ntohl(pHead->vgId); } @@ -487,9 +487,9 @@ static inline bool syncUtilCanPrint(char c) { } } -char* syncUtilPrintBin(char* ptr, uint32_t len) { +char *syncUtilPrintBin(char *ptr, uint32_t len) { int64_t memLen = (int64_t)(len + 1); - char* s = taosMemoryMalloc(memLen); + char *s = taosMemoryMalloc(memLen); ASSERT(s != NULL); memset(s, 0, len + 1); memcpy(s, ptr, len); @@ -502,13 +502,13 @@ char* syncUtilPrintBin(char* ptr, uint32_t len) { return s; } -char* syncUtilPrintBin2(char* ptr, uint32_t len) { +char *syncUtilPrintBin2(char *ptr, uint32_t len) { uint32_t len2 = len * 4 + 1; - char* s = taosMemoryMalloc(len2); + char *s = taosMemoryMalloc(len2); ASSERT(s != NULL); memset(s, 0, len2); - char* p = s; + char *p = s; for (int32_t i = 0; i < len; ++i) { int32_t n = sprintf(p, "%d,", ptr[i]); p += n; @@ -516,7 +516,7 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len) { return s; } -void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port) { +void syncUtilU642Addr(uint64_t u64, char *host, int64_t len, uint16_t *port) { uint32_t hostU32 = (uint32_t)((u64 >> 32) & 0x00000000FFFFFFFF); struct in_addr addr = {.s_addr = hostU32}; @@ -524,7 +524,7 @@ void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port) { *port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16); } -uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { +uint64_t syncUtilAddr2U64(const char *host, uint16_t port) { uint32_t hostU32 = taosGetIpv4FromFqdn(host); if (hostU32 == (uint32_t)-1) { sError("failed to resolve ipv4 addr, host:%s", host); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a2c486767f..f5570f6afd 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -154,6 +154,7 @@ typedef struct { #pragma pack(push, 1) +#define TRANS_VER 2 typedef struct { char version : 4; // RPC version char comp : 2; // compression algorithm, 0:no compression 1:lz4 @@ -167,6 +168,7 @@ typedef struct { uint64_t timestamp; char user[TSDB_UNI_LEN]; uint32_t magicNum; + uint32_t compatibilityVer; STraceId traceId; uint64_t ahandle; // ahandle assigned by client uint32_t code; // del later diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 8ea0064d44..9ef2373b3f 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -46,9 +46,9 @@ typedef struct { int8_t connType; char label[TSDB_LABEL_LEN]; char user[TSDB_UNI_LEN]; // meter ID - - int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size - int8_t encryption; // encrypt or not + int32_t compatibilityVer; + int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size + int8_t encryption; // encrypt or not int32_t retryMinInterval; // retry init interval int32_t retryStepFactor; // retry interval factor diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 0771f9198a..08b0451982 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -50,6 +50,7 @@ void* rpcOpen(const SRpcInit* pInit) { } pRpc->encryption = pInit->encryption; + pRpc->compatibilityVer = pInit->compatibilityVer; pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval pRpc->retryStepFactor = pInit->retryStepFactor; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1709fc3cb1..8af4427e22 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -984,11 +984,10 @@ void cliSendBatch(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; - SCliBatch* pBatch = pConn->pBatch; - SCliBatchList* pList = pBatch->pList; - pList->connCnt += 1; + SCliBatch* pBatch = pConn->pBatch; + int32_t wLen = pBatch->wLen; - int32_t wLen = pBatch->wLen; + pBatch->pList->connCnt += 1; uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t)); int i = 0; @@ -1018,6 +1017,8 @@ void cliSendBatch(SCliConn* pConn) { memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); pHead->traceId = pMsg->info.traceId; pHead->magicNum = htonl(TRANS_MAGIC_NUM); + pHead->version = TRANS_VER; + pHead->compatibilityVer = htonl(pTransInst->compatibilityVer); } pHead->timestamp = taosHton64(taosGetTimestampUs()); @@ -1074,6 +1075,8 @@ void cliSend(SCliConn* pConn) { memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); pHead->traceId = pMsg->info.traceId; pHead->magicNum = htonl(TRANS_MAGIC_NUM); + pHead->version = TRANS_VER; + pHead->compatibilityVer = htonl(pTransInst->compatibilityVer); } pHead->timestamp = taosHton64(taosGetTimestampUs()); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 0dfc7677b3..b14db9497e 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -192,7 +192,7 @@ bool transReadComplete(SConnBuffer* connBuf) { memcpy((char*)&head, connBuf->buf, sizeof(head)); int32_t msgLen = (int32_t)htonl(head.msgLen); p->total = msgLen; - p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)); + p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)) || head.version != TRANS_VER; } if (p->total >= p->len) { p->left = p->total - p->len; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index da3b0ad626..496295938a 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -236,8 +236,8 @@ static bool uvHandleReq(SSvrConn* pConn) { if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); if (cost >= EXCEPTION_LIMIT_US) { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", transLabel(pTransInst), - pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost); + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", + transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost); } else { tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost); @@ -245,8 +245,8 @@ static bool uvHandleReq(SSvrConn* pConn) { } else { if (cost >= EXCEPTION_LIMIT_US) { tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception", - transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, - transMsg.code, (int)(cost)); + transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, + transMsg.code, (int)(cost)); } else { tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus", transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, @@ -262,6 +262,7 @@ static bool uvHandleReq(SSvrConn* pConn) { transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId); transMsg.info.refId = pConn->refId; transMsg.info.traceId = pHead->traceId; + transMsg.info.cliVer = htonl(pHead->compatibilityVer); tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn, pConn->refId); @@ -410,6 +411,8 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; pHead->magicNum = htonl(TRANS_MAGIC_NUM); + pHead->compatibilityVer = 0; + pHead->version = TRANS_VER; // handle invalid drop_task resp, TD-20098 if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { diff --git a/source/libs/transport/test/cliBench.c b/source/libs/transport/test/cliBench.c index aaee162cd7..8a5276b814 100644 --- a/source/libs/transport/test/cliBench.c +++ b/source/libs/transport/test/cliBench.c @@ -19,6 +19,7 @@ #include "transLog.h" #include "trpc.h" #include "tutil.h" +#include "tversion.h" typedef struct { int index; @@ -155,7 +156,7 @@ int main(int argc, char *argv[]) { } initLogEnv(); - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); void *pRpc = rpcOpen(&rpcInit); if (pRpc == NULL) { tError("failed to initialize RPC"); diff --git a/source/libs/transport/test/svrBench.c b/source/libs/transport/test/svrBench.c index 4e2395b17b..a3fa81662c 100644 --- a/source/libs/transport/test/svrBench.c +++ b/source/libs/transport/test/svrBench.c @@ -13,12 +13,13 @@ * along with this program. If not, see . */ -//#define _DEFAULT_SOURCE +// #define _DEFAULT_SOURCE #include "os.h" #include "tglobal.h" #include "tqueue.h" #include "transLog.h" #include "trpc.h" +#include "tversion.h" int msgSize = 128; int commit = 0; @@ -151,6 +152,8 @@ int main(int argc, char *argv[]) { rpcInit.numOfThreads = 1; rpcInit.cfp = processRequestMsg; rpcInit.idleTime = 2 * 1500; + + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); rpcDebugFlag = 131; for (int i = 1; i < argc; ++i) { @@ -187,7 +190,7 @@ int main(int argc, char *argv[]) { rpcInit.connType = TAOS_CONN_SERVER; initLogEnv(); - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); void *pRpc = rpcOpen(&rpcInit); if (pRpc == NULL) { tError("failed to start RPC server"); diff --git a/tools/shell/src/shellNettest.c b/tools/shell/src/shellNettest.c index 1a6ac3489d..9fe92212ca 100644 --- a/tools/shell/src/shellNettest.c +++ b/tools/shell/src/shellNettest.c @@ -15,6 +15,7 @@ #define _GNU_SOURCE #include "shellInt.h" +#include "tversion.h" static void shellWorkAsClient() { SShellArgs *pArgs = &shell.args; @@ -33,6 +34,7 @@ static void shellWorkAsClient() { rpcInit.user = "_dnd"; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); clientRpc = rpcOpen(&rpcInit); if (clientRpc == NULL) { printf("failed to init net test client since %s\r\n", terrstr()); @@ -123,6 +125,8 @@ static void shellWorkAsServer() { rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 1000; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); + void *serverRpc = rpcOpen(&rpcInit); if (serverRpc == NULL) { printf("failed to init net test server since %s\r\n", terrstr()); From af053ec75bc0c0d65eb318b534ebe4101aef2721 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 5 Jul 2023 11:15:12 +0000 Subject: [PATCH 2/8] add version check in rpc --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 1 + source/libs/transport/src/transCli.c | 1 + source/libs/transport/src/transSvr.c | 3 ++- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index e6c2fe114f..a7a7273bd0 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -77,6 +77,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { int32_t svrVer = 0; taosVersionStrToInt(version, &svrVer); if (0 != taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) { + dError("msg ver: %d, curr ver: %d", pRpc->info.cliVer, svrVer); goto _OVER; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8af4427e22..e2d4983e57 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -391,6 +391,7 @@ void cliHandleResp(SCliConn* conn) { transMsg.info.ahandle = NULL; transMsg.info.traceId = pHead->traceId; transMsg.info.hasEpSet = pHead->hasEpSet; + transMsg.info.cliVer = pHead->compatibilityVer; SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 496295938a..0f165d04b2 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -196,6 +196,7 @@ static bool uvHandleReq(SSvrConn* pConn) { tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn); return false; } + tDebug("head version: %d 2", pHead->version); pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); @@ -411,7 +412,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; pHead->magicNum = htonl(TRANS_MAGIC_NUM); - pHead->compatibilityVer = 0; + pHead->compatibilityVer = ((STrans*)pConn->pTransInst)->compatibilityVer; pHead->version = TRANS_VER; // handle invalid drop_task resp, TD-20098 From d0d8d93c94f68bb1ac58271a38423140de622556 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 6 Jul 2023 01:29:49 +0000 Subject: [PATCH 3/8] add version check in rpc --- source/libs/transport/src/transCli.c | 2 +- source/libs/transport/src/transSvr.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e2d4983e57..227fc63680 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -391,7 +391,7 @@ void cliHandleResp(SCliConn* conn) { transMsg.info.ahandle = NULL; transMsg.info.traceId = pHead->traceId; transMsg.info.hasEpSet = pHead->hasEpSet; - transMsg.info.cliVer = pHead->compatibilityVer; + transMsg.info.cliVer = htonl(pHead->compatibilityVer); SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 0f165d04b2..f23e176c79 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -412,7 +412,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; pHead->magicNum = htonl(TRANS_MAGIC_NUM); - pHead->compatibilityVer = ((STrans*)pConn->pTransInst)->compatibilityVer; + pHead->compatibilityVer = htonl(((STrans*)pConn->pTransInst)->compatibilityVer); pHead->version = TRANS_VER; // handle invalid drop_task resp, TD-20098 From a630d1284ca79f33d3043fe7ea56ecd37dadc13f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 6 Jul 2023 02:03:31 +0000 Subject: [PATCH 4/8] add version check in rpc --- source/dnode/mgmt/test/sut/src/client.cpp | 2 ++ source/libs/transport/test/transUT.cpp | 8 ++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/test/sut/src/client.cpp b/source/dnode/mgmt/test/sut/src/client.cpp index a27a511651..95eea2359d 100644 --- a/source/dnode/mgmt/test/sut/src/client.cpp +++ b/source/dnode/mgmt/test/sut/src/client.cpp @@ -16,6 +16,7 @@ #include "sut.h" #include "tdatablock.h" #include "tmisce.h" +#include "tversion.h" static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) { TestClient* client = (TestClient*)parent; @@ -53,6 +54,7 @@ void TestClient::DoInit() { rpcInit.parent = this; // rpcInit.secret = (char*)secretEncrypt; // rpcInit.spi = 1; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); clientRpc = rpcOpen(&rpcInit); ASSERT(clientRpc); diff --git a/source/libs/transport/test/transUT.cpp b/source/libs/transport/test/transUT.cpp index 88a1e2564f..2fa94c358f 100644 --- a/source/libs/transport/test/transUT.cpp +++ b/source/libs/transport/test/transUT.cpp @@ -18,10 +18,10 @@ #include "tdatablock.h" #include "tglobal.h" #include "tlog.h" +#include "tmisce.h" #include "transLog.h" #include "trpc.h" -#include "tmisce.h" - +#include "tversion.h" using namespace std; const char *label = "APP"; @@ -54,6 +54,8 @@ class Client { rpcInit_.user = (char *)user; rpcInit_.parent = this; rpcInit_.connType = TAOS_CONN_CLIENT; + + taosVersionStrToInt(version, &(rpcInit_.compatibilityVer)); this->transCli = rpcOpen(&rpcInit_); tsem_init(&this->sem, 0, 0); } @@ -66,6 +68,7 @@ class Client { void Restart(CB cb) { rpcClose(this->transCli); rpcInit_.cfp = cb; + taosVersionStrToInt(version, &(rpcInit_.compatibilityVer)); this->transCli = rpcOpen(&rpcInit_); } void Stop() { @@ -117,6 +120,7 @@ class Server { rpcInit_.cfp = processReq; rpcInit_.user = (char *)user; rpcInit_.connType = TAOS_CONN_SERVER; + taosVersionStrToInt(version, &(rpcInit_.compatibilityVer)); } void Start() { this->transSrv = rpcOpen(&this->rpcInit_); From d061c54a180e4d9b239c606c0eeb63942c839b5c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 6 Jul 2023 06:41:06 +0000 Subject: [PATCH 5/8] add ver check --- source/libs/transport/inc/transportInt.h | 2 +- source/libs/transport/src/transCli.c | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 9ef2373b3f..ca48da690b 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -49,7 +49,7 @@ typedef struct { int32_t compatibilityVer; int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int8_t encryption; // encrypt or not - + int32_t retryMinInterval; // retry init interval int32_t retryStepFactor; // retry interval factor int32_t retryMaxInterval; // retry max interval diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 227fc63680..8062a0618b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -489,6 +489,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; transMsg.info.ahandle = NULL; + transMsg.info.cliVer = pTransInst->compatibilityVer; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); @@ -1350,6 +1351,7 @@ static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) { transMsg.info.ahandle = pMsg->ctx->ahandle; transMsg.info.traceId = pMsg->msg.info.traceId; transMsg.info.hasEpSet = false; + transMsg.info.cliVer = pTransInst->compatibilityVer; if (pCtx->pSem != NULL) { if (pCtx->pRsp == NULL) { } else { @@ -1531,6 +1533,9 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { // persist conn already release by server STransMsg resp; cliBuildExceptResp(pMsg, &resp); + // refactorr later + resp.info.cliVer = pTransInst->compatibilityVer; + if (pMsg->type != Release) { pTransInst->cfp(pTransInst->parent, &resp, NULL); } @@ -1840,6 +1845,7 @@ void cliIteraConnMsgs(SCliConn* conn) { if (-1 == cliBuildExceptResp(cmsg, &resp)) { continue; } + resp.info.cliVer = pTransInst->compatibilityVer; pTransInst->cfp(pTransInst->parent, &resp, NULL); cmsg->ctx->ahandle = NULL; From 5e156aa0a9b0eaf153ec2bf0ca207e498529a76b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 6 Jul 2023 06:43:07 +0000 Subject: [PATCH 6/8] add ver check --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index a7a7273bd0..5d6d16ccf8 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -77,7 +77,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { int32_t svrVer = 0; taosVersionStrToInt(version, &svrVer); if (0 != taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) { - dError("msg ver: %d, curr ver: %d", pRpc->info.cliVer, svrVer); + dError("Version not compatible, cli ver: %d, svr ver: %d", pRpc->info.cliVer, svrVer); goto _OVER; } From 196f2b8f65f8676ae8d96125c24a6f996edb42b1 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Thu, 6 Jul 2023 16:45:53 +0800 Subject: [PATCH 7/8] test:comment connection of new client and old server --- tests/system-test/0-others/compatibility.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/system-test/0-others/compatibility.py b/tests/system-test/0-others/compatibility.py index 22e319fdaf..b8623c3bcd 100644 --- a/tests/system-test/0-others/compatibility.py +++ b/tests/system-test/0-others/compatibility.py @@ -151,9 +151,10 @@ class TDTestCase: os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '") os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql") - cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;" - if os.system(cmd) == 0: - raise Exception("failed to execute system command. cmd: %s" % cmd) + # cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;" + # tdLog.info(f"new client version connect to old version taosd, commad return value:{cmd}") + # if os.system(cmd) == 0: + # raise Exception("failed to execute system command. cmd: %s" % cmd) os.system("pkill taosd") # make sure all the data are saved in disk. self.checkProcessPid("taosd") From 3bd99776d28bf7d537a44aa2453ffab6ccda88a5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 6 Jul 2023 18:03:15 +0800 Subject: [PATCH 8/8] add rpc version check --- source/libs/transport/inc/transComm.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index f5570f6afd..3b304e2c77 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -167,8 +167,8 @@ typedef struct { uint64_t timestamp; char user[TSDB_UNI_LEN]; + int32_t compatibilityVer; uint32_t magicNum; - uint32_t compatibilityVer; STraceId traceId; uint64_t ahandle; // ahandle assigned by client uint32_t code; // del later