From 79f01ad65578be2694f1309b80fc98b4b7cb80ec Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 5 Jul 2023 08:16:25 +0000 Subject: [PATCH 01/12] add version check in rpc --- include/libs/transport/trpc.h | 2 + source/client/src/clientEnv.c | 45 ++++++++-------- source/client/src/clientImpl.c | 36 ++++++------- source/client/src/clientMain.c | 54 +++++++++---------- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 10 +++- source/libs/function/src/udfd.c | 3 +- .../libs/sync/test/sync_test_lib/src/syncIO.c | 26 ++++----- source/libs/transport/inc/transComm.h | 2 + source/libs/transport/inc/transportInt.h | 6 +-- source/libs/transport/src/trans.c | 1 + source/libs/transport/src/transCli.c | 11 ++-- source/libs/transport/src/transComm.c | 2 +- source/libs/transport/src/transSvr.c | 11 ++-- source/libs/transport/test/cliBench.c | 3 +- source/libs/transport/test/svrBench.c | 7 ++- tools/shell/src/shellNettest.c | 4 ++ 16 files changed, 123 insertions(+), 100 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index c73e5c127a..93e4d72ad7 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -46,6 +46,7 @@ typedef struct SRpcHandleInfo { int8_t noResp; // has response or not(default 0, 0: resp, 1: no resp) int8_t persistHandle; // persist handle or not int8_t hasEpSet; + int32_t cliVer; // app info void *ahandle; // app handle set by client @@ -83,6 +84,7 @@ typedef struct SRpcInit { int32_t sessions; // number of sessions allowed int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int32_t idleTime; // milliseconds, 0 means idle timer is disabled + int32_t compatibilityVer; int32_t retryMinInterval; // retry init interval int32_t retryStepFactor; // retry interval factor diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index c64bbfbdb6..238b3613f5 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -29,6 +29,7 @@ #include "trpc.h" #include "tsched.h" #include "ttime.h" +#include "tversion.h" #if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL) #include "cus_name.h" @@ -111,7 +112,8 @@ static void deregisterRequest(SRequestObj *pRequest) { atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1); if (tsSlowLogScope & reqType) { taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 ", Duration:%" PRId64 "us, SQL:%s", - taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration, pRequest->sqlstr); + taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration, + pRequest->sqlstr); } } @@ -175,6 +177,8 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.connLimitNum = connLimitNum; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); + void *pDnodeConn = rpcOpen(&rpcInit); if (pDnodeConn == NULL) { tscError("failed to init connection to server"); @@ -358,17 +362,16 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); } - void destroySubRequests(SRequestObj *pRequest) { - int32_t reqIdx = -1; + int32_t reqIdx = -1; SRequestObj *pReqList[16] = {NULL}; - uint64_t tmpRefId = 0; + uint64_t tmpRefId = 0; if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) { return; } - - SRequestObj* pTmp = pRequest; + + SRequestObj *pTmp = pRequest; while (pTmp->relation.prevRefId) { tmpRefId = pTmp->relation.prevRefId; pTmp = acquireRequest(tmpRefId); @@ -376,9 +379,9 @@ void destroySubRequests(SRequestObj *pRequest) { pReqList[++reqIdx] = pTmp; releaseRequest(tmpRefId); } else { - tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, - tmpRefId, pTmp->requestId); - break; + tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId, + pTmp->requestId); + break; } } @@ -391,16 +394,15 @@ void destroySubRequests(SRequestObj *pRequest) { pTmp = acquireRequest(tmpRefId); if (pTmp) { tmpRefId = pTmp->relation.nextRefId; - removeRequest(pTmp->self); + removeRequest(pTmp->self); releaseRequest(pTmp->self); } else { tscError("0x%" PRIx64 " is not there", tmpRefId); - break; + break; } } } - void doDestroyRequest(void *p) { if (NULL == p) { return; @@ -412,7 +414,7 @@ void doDestroyRequest(void *p) { tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); destroySubRequests(pRequest); - + taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); schedulerFreeJob(&pRequest->body.queryJob, 0); @@ -473,15 +475,15 @@ void taosStopQueryImpl(SRequestObj *pRequest) { } void stopAllQueries(SRequestObj *pRequest) { - int32_t reqIdx = -1; + int32_t reqIdx = -1; SRequestObj *pReqList[16] = {NULL}; - uint64_t tmpRefId = 0; + uint64_t tmpRefId = 0; if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) { return; } - - SRequestObj* pTmp = pRequest; + + SRequestObj *pTmp = pRequest; while (pTmp->relation.prevRefId) { tmpRefId = pTmp->relation.prevRefId; pTmp = acquireRequest(tmpRefId); @@ -489,9 +491,9 @@ void stopAllQueries(SRequestObj *pRequest) { pReqList[++reqIdx] = pTmp; releaseRequest(tmpRefId); } else { - tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, - tmpRefId, pTmp->requestId); - break; + tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId, + pTmp->requestId); + break; } } @@ -510,12 +512,11 @@ void stopAllQueries(SRequestObj *pRequest) { releaseRequest(pTmp->self); } else { tscError("0x%" PRIx64 " is not there", tmpRefId); - break; + break; } } } - void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); } static void *tscCrashReportThreadFp(void *param) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 955c90fc81..14d6394fc4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -26,7 +26,7 @@ #include "tpagedbuf.h" #include "tref.h" #include "tsched.h" - +#include "tversion.h" static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); @@ -237,8 +237,9 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, return TSDB_CODE_SUCCESS; } -int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest) { - int32_t code = buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0); +int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) { + int32_t code = + buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0); if (TSDB_CODE_SUCCESS == code) { pRequest->relation.prevRefId = (*pNewRequest)->self; (*pNewRequest)->relation.nextRefId = pRequest->self; @@ -502,8 +503,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t pResInfo->userFields[i].bytes = pSchema[i].bytes; pResInfo->userFields[i].type = pSchema[i].type; - if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR || - pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) { + if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR || pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) { pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE; } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) { pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; @@ -891,7 +891,7 @@ static bool incompletaFileParsing(SNode* pStmt) { void continuePostSubQuery(SRequestObj* pRequest, TAOS_ROW row) { SSqlCallbackWrapper* pWrapper = pRequest->pWrapper; - int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId); + int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId); if (TSDB_CODE_SUCCESS == code) { int64_t analyseStart = taosGetTimestampUs(); code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, (void**)row); @@ -934,7 +934,7 @@ void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) { TAOS_ROW row = NULL; if (rowNum > 0) { - row = taos_fetch_row(res); // for single row only now + row = taos_fetch_row(res); // for single row only now } SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId); @@ -2135,6 +2135,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); clientRpc = rpcOpen(&rpcInit); if (clientRpc == NULL) { @@ -2494,11 +2495,10 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, return pRequest; } +static void fetchCallback(void* pResult, void* param, int32_t code) { + SRequestObj* pRequest = (SRequestObj*)param; -static void fetchCallback(void *pResult, void *param, int32_t code) { - SRequestObj *pRequest = (SRequestObj *)param; - - SReqResultInfo *pResultInfo = &pRequest->body.resInfo; + SReqResultInfo* pResultInfo = &pRequest->body.resInfo; tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); @@ -2520,7 +2520,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) { } pRequest->code = - setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, true); + setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, true); if (pRequest->code != TSDB_CODE_SUCCESS) { pResultInfo->numOfRows = 0; pRequest->code = code; @@ -2531,19 +2531,19 @@ static void fetchCallback(void *pResult, void *param, int32_t code) { pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId); - STscObj *pTscObj = pRequest->pTscObj; - SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; - atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); + STscObj* pTscObj = pRequest->pTscObj; + SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary; + atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); } pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); } -void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param) { +void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) { pRequest->body.fetchFp = fp; pRequest->body.param = param; - SReqResultInfo *pResultInfo = &pRequest->body.resInfo; + SReqResultInfo* pResultInfo = &pRequest->body.resInfo; // this query has no results or error exists, return directly if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) { @@ -2578,5 +2578,3 @@ void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param schedulerFetchRows(pRequest->body.queryJob, &req); } - - diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 7ce838cd2c..e262ee04b9 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -558,13 +558,12 @@ int taos_select_db(TAOS *taos, const char *db) { return code; } - void taos_stop_query(TAOS_RES *res) { if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { return; } - stopAllQueries((SRequestObj*)res); + stopAllQueries((SRequestObj *)res); } bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { @@ -785,7 +784,7 @@ void destorySqlCallbackWrapper(SSqlCallbackWrapper *pWrapper) { taosMemoryFree(pWrapper); } -void destroyCtxInRequest(SRequestObj* pRequest) { +void destroyCtxInRequest(SRequestObj *pRequest) { schedulerFreeJob(&pRequest->body.queryJob, 0); qDestroyQuery(pRequest->pQuery); pRequest->pQuery = NULL; @@ -793,7 +792,6 @@ void destroyCtxInRequest(SRequestObj* pRequest) { pRequest->pWrapper = NULL; } - static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t code) { SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param; SRequestObj *pRequest = pWrapper->pRequest; @@ -807,15 +805,15 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t if (TSDB_CODE_SUCCESS == code) { code = qAnalyseSqlSemantic(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery); } - + pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart; - + handleQueryAnslyseRes(pWrapper, pResultMeta, code); } -int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) { - int32_t code = TSDB_CODE_SUCCESS; - SCatalogReq* pTarget = taosMemoryCalloc(1, sizeof(SCatalogReq)); +int32_t cloneCatalogReq(SCatalogReq **ppTarget, SCatalogReq *pSrc) { + int32_t code = TSDB_CODE_SUCCESS; + SCatalogReq *pTarget = taosMemoryCalloc(1, sizeof(SCatalogReq)); if (pTarget == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; } else { @@ -842,17 +840,16 @@ int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) { return code; } - -void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode* pRoot) { - SRequestObj* pNewRequest = NULL; - SSqlCallbackWrapper* pNewWrapper = NULL; - int32_t code = buildPreviousRequest(pWrapper->pRequest, pWrapper->pRequest->sqlstr, &pNewRequest); +void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode *pRoot) { + SRequestObj *pNewRequest = NULL; + SSqlCallbackWrapper *pNewWrapper = NULL; + int32_t code = buildPreviousRequest(pWrapper->pRequest, pWrapper->pRequest->sqlstr, &pNewRequest); if (code) { handleQueryAnslyseRes(pWrapper, pResultMeta, code); return; } - pNewRequest->pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); + pNewRequest->pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY); if (NULL == pNewRequest->pQuery) { code = TSDB_CODE_OUT_OF_MEMORY; } else { @@ -871,16 +868,16 @@ void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResult } void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code) { - SRequestObj *pRequest = pWrapper->pRequest; - SQuery *pQuery = pRequest->pQuery; + SRequestObj *pRequest = pWrapper->pRequest; + SQuery *pQuery = pRequest->pQuery; if (code == TSDB_CODE_SUCCESS && pQuery->pPrevRoot) { - SNode* prevRoot = pQuery->pPrevRoot; + SNode *prevRoot = pQuery->pPrevRoot; pQuery->pPrevRoot = NULL; handleSubQueryFromAnalyse(pWrapper, pResultMeta, prevRoot); return; } - + if (code == TSDB_CODE_SUCCESS) { pRequest->stableQuery = pQuery->stableQuery; if (pQuery->pRoot) { @@ -1043,7 +1040,7 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { } int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; STscObj *pTscObj = pRequest->pTscObj; SSqlCallbackWrapper *pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper)); if (pWrapper == NULL) { @@ -1081,7 +1078,6 @@ int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *p return code; } - void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { SSqlCallbackWrapper *pWrapper = NULL; int32_t code = TSDB_CODE_SUCCESS; @@ -1128,12 +1124,12 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { } void restartAsyncQuery(SRequestObj *pRequest, int32_t code) { - int32_t reqIdx = 0; + int32_t reqIdx = 0; SRequestObj *pReqList[16] = {NULL}; SRequestObj *pUserReq = NULL; pReqList[0] = pRequest; - uint64_t tmpRefId = 0; - SRequestObj* pTmp = pRequest; + uint64_t tmpRefId = 0; + SRequestObj *pTmp = pRequest; while (pTmp->relation.prevRefId) { tmpRefId = pTmp->relation.prevRefId; pTmp = acquireRequest(tmpRefId); @@ -1141,9 +1137,9 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) { pReqList[++reqIdx] = pTmp; releaseRequest(tmpRefId); } else { - tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, - tmpRefId, pTmp->requestId); - break; + tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId, + pTmp->requestId); + break; } } @@ -1152,11 +1148,11 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) { pTmp = acquireRequest(tmpRefId); if (pTmp) { tmpRefId = pTmp->relation.nextRefId; - removeRequest(pTmp->self); + removeRequest(pTmp->self); releaseRequest(pTmp->self); } else { tscError("0x%" PRIx64 " is not there", tmpRefId); - break; + break; } } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index ea46b70693..e6c2fe114f 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" #include "qworker.h" +#include "tversion.h" static inline void dmSendRsp(SRpcMsg *pMsg) { rpcSendResponse(pMsg); } @@ -73,6 +74,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType), pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId); + int32_t svrVer = 0; + taosVersionStrToInt(version, &svrVer); + if (0 != taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) { + goto _OVER; + } + switch (pRpc->msgType) { case TDMT_DND_NET_TEST: dmProcessNetTestReq(pDnode, pRpc); @@ -305,6 +312,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.supportBatch = 1; rpcInit.batchSize = 8 * 1024; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { @@ -339,7 +347,7 @@ int32_t dmInitServer(SDnode *pDnode) { rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.parent = pDnode; rpcInit.compressSize = tsCompressMsgSize; - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); pTrans->serverRpc = rpcOpen(&rpcInit); if (pTrans->serverRpc == NULL) { dError("failed to init dnode rpc server"); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 3cc7c052cc..7371017111 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -29,6 +29,7 @@ #include "tmsg.h" #include "trpc.h" #include "tmisce.h" +#include "tversion.h" // clang-format on #define UDFD_MAX_SCRIPT_PLUGINS 64 @@ -1038,7 +1039,7 @@ int32_t udfdOpenClientRpc() { connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); global.clientRpc = rpcOpen(&rpcInit); if (global.clientRpc == NULL) { fnError("failed to init dnode rpc client"); diff --git a/source/libs/sync/test/sync_test_lib/src/syncIO.c b/source/libs/sync/test/sync_test_lib/src/syncIO.c index 2e00785586..4f8ae59348 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncIO.c +++ b/source/libs/sync/test/sync_test_lib/src/syncIO.c @@ -21,6 +21,7 @@ #include "tglobal.h" #include "ttimer.h" #include "tutil.h" +#include "tversion.h" bool gRaftDetailLog = false; SSyncIO *gSyncIO = NULL; @@ -188,7 +189,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) { rpcInit.idleTime = 100; rpcInit.user = "sync-io"; rpcInit.connType = TAOS_CONN_CLIENT; - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); io->clientRpc = rpcOpen(&rpcInit); if (io->clientRpc == NULL) { sError("failed to initialize RPC"); @@ -209,7 +210,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) { rpcInit.idleTime = 2 * 1500; rpcInit.parent = io; rpcInit.connType = TAOS_CONN_SERVER; - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); void *pRpc = rpcOpen(&rpcInit); if (pRpc == NULL) { sError("failed to start RPC server"); @@ -470,11 +471,10 @@ static void syncIOTickPing(void *param, void *tmrId) { taosTmrReset(syncIOTickPing, io->pingTimerMS, io, io->timerMgr, &io->pingTimer); } -void syncEntryDestory(SSyncRaftEntry* pEntry) {} +void syncEntryDestory(SSyncRaftEntry *pEntry) {} - -void syncUtilMsgNtoH(void* msg) { - SMsgHead* pHead = msg; +void syncUtilMsgNtoH(void *msg) { + SMsgHead *pHead = msg; pHead->contLen = ntohl(pHead->contLen); pHead->vgId = ntohl(pHead->vgId); } @@ -487,9 +487,9 @@ static inline bool syncUtilCanPrint(char c) { } } -char* syncUtilPrintBin(char* ptr, uint32_t len) { +char *syncUtilPrintBin(char *ptr, uint32_t len) { int64_t memLen = (int64_t)(len + 1); - char* s = taosMemoryMalloc(memLen); + char *s = taosMemoryMalloc(memLen); ASSERT(s != NULL); memset(s, 0, len + 1); memcpy(s, ptr, len); @@ -502,13 +502,13 @@ char* syncUtilPrintBin(char* ptr, uint32_t len) { return s; } -char* syncUtilPrintBin2(char* ptr, uint32_t len) { +char *syncUtilPrintBin2(char *ptr, uint32_t len) { uint32_t len2 = len * 4 + 1; - char* s = taosMemoryMalloc(len2); + char *s = taosMemoryMalloc(len2); ASSERT(s != NULL); memset(s, 0, len2); - char* p = s; + char *p = s; for (int32_t i = 0; i < len; ++i) { int32_t n = sprintf(p, "%d,", ptr[i]); p += n; @@ -516,7 +516,7 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len) { return s; } -void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port) { +void syncUtilU642Addr(uint64_t u64, char *host, int64_t len, uint16_t *port) { uint32_t hostU32 = (uint32_t)((u64 >> 32) & 0x00000000FFFFFFFF); struct in_addr addr = {.s_addr = hostU32}; @@ -524,7 +524,7 @@ void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port) { *port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16); } -uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { +uint64_t syncUtilAddr2U64(const char *host, uint16_t port) { uint32_t hostU32 = taosGetIpv4FromFqdn(host); if (hostU32 == (uint32_t)-1) { sError("failed to resolve ipv4 addr, host:%s", host); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a2c486767f..f5570f6afd 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -154,6 +154,7 @@ typedef struct { #pragma pack(push, 1) +#define TRANS_VER 2 typedef struct { char version : 4; // RPC version char comp : 2; // compression algorithm, 0:no compression 1:lz4 @@ -167,6 +168,7 @@ typedef struct { uint64_t timestamp; char user[TSDB_UNI_LEN]; uint32_t magicNum; + uint32_t compatibilityVer; STraceId traceId; uint64_t ahandle; // ahandle assigned by client uint32_t code; // del later diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 8ea0064d44..9ef2373b3f 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -46,9 +46,9 @@ typedef struct { int8_t connType; char label[TSDB_LABEL_LEN]; char user[TSDB_UNI_LEN]; // meter ID - - int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size - int8_t encryption; // encrypt or not + int32_t compatibilityVer; + int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size + int8_t encryption; // encrypt or not int32_t retryMinInterval; // retry init interval int32_t retryStepFactor; // retry interval factor diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 0771f9198a..08b0451982 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -50,6 +50,7 @@ void* rpcOpen(const SRpcInit* pInit) { } pRpc->encryption = pInit->encryption; + pRpc->compatibilityVer = pInit->compatibilityVer; pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval pRpc->retryStepFactor = pInit->retryStepFactor; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1709fc3cb1..8af4427e22 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -984,11 +984,10 @@ void cliSendBatch(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; - SCliBatch* pBatch = pConn->pBatch; - SCliBatchList* pList = pBatch->pList; - pList->connCnt += 1; + SCliBatch* pBatch = pConn->pBatch; + int32_t wLen = pBatch->wLen; - int32_t wLen = pBatch->wLen; + pBatch->pList->connCnt += 1; uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t)); int i = 0; @@ -1018,6 +1017,8 @@ void cliSendBatch(SCliConn* pConn) { memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); pHead->traceId = pMsg->info.traceId; pHead->magicNum = htonl(TRANS_MAGIC_NUM); + pHead->version = TRANS_VER; + pHead->compatibilityVer = htonl(pTransInst->compatibilityVer); } pHead->timestamp = taosHton64(taosGetTimestampUs()); @@ -1074,6 +1075,8 @@ void cliSend(SCliConn* pConn) { memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); pHead->traceId = pMsg->info.traceId; pHead->magicNum = htonl(TRANS_MAGIC_NUM); + pHead->version = TRANS_VER; + pHead->compatibilityVer = htonl(pTransInst->compatibilityVer); } pHead->timestamp = taosHton64(taosGetTimestampUs()); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 0dfc7677b3..b14db9497e 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -192,7 +192,7 @@ bool transReadComplete(SConnBuffer* connBuf) { memcpy((char*)&head, connBuf->buf, sizeof(head)); int32_t msgLen = (int32_t)htonl(head.msgLen); p->total = msgLen; - p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)); + p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)) || head.version != TRANS_VER; } if (p->total >= p->len) { p->left = p->total - p->len; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index da3b0ad626..496295938a 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -236,8 +236,8 @@ static bool uvHandleReq(SSvrConn* pConn) { if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); if (cost >= EXCEPTION_LIMIT_US) { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", transLabel(pTransInst), - pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost); + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", + transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost); } else { tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost); @@ -245,8 +245,8 @@ static bool uvHandleReq(SSvrConn* pConn) { } else { if (cost >= EXCEPTION_LIMIT_US) { tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception", - transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, - transMsg.code, (int)(cost)); + transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, + transMsg.code, (int)(cost)); } else { tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus", transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, @@ -262,6 +262,7 @@ static bool uvHandleReq(SSvrConn* pConn) { transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId); transMsg.info.refId = pConn->refId; transMsg.info.traceId = pHead->traceId; + transMsg.info.cliVer = htonl(pHead->compatibilityVer); tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn, pConn->refId); @@ -410,6 +411,8 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; pHead->magicNum = htonl(TRANS_MAGIC_NUM); + pHead->compatibilityVer = 0; + pHead->version = TRANS_VER; // handle invalid drop_task resp, TD-20098 if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { diff --git a/source/libs/transport/test/cliBench.c b/source/libs/transport/test/cliBench.c index aaee162cd7..8a5276b814 100644 --- a/source/libs/transport/test/cliBench.c +++ b/source/libs/transport/test/cliBench.c @@ -19,6 +19,7 @@ #include "transLog.h" #include "trpc.h" #include "tutil.h" +#include "tversion.h" typedef struct { int index; @@ -155,7 +156,7 @@ int main(int argc, char *argv[]) { } initLogEnv(); - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); void *pRpc = rpcOpen(&rpcInit); if (pRpc == NULL) { tError("failed to initialize RPC"); diff --git a/source/libs/transport/test/svrBench.c b/source/libs/transport/test/svrBench.c index 4e2395b17b..a3fa81662c 100644 --- a/source/libs/transport/test/svrBench.c +++ b/source/libs/transport/test/svrBench.c @@ -13,12 +13,13 @@ * along with this program. If not, see . */ -//#define _DEFAULT_SOURCE +// #define _DEFAULT_SOURCE #include "os.h" #include "tglobal.h" #include "tqueue.h" #include "transLog.h" #include "trpc.h" +#include "tversion.h" int msgSize = 128; int commit = 0; @@ -151,6 +152,8 @@ int main(int argc, char *argv[]) { rpcInit.numOfThreads = 1; rpcInit.cfp = processRequestMsg; rpcInit.idleTime = 2 * 1500; + + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); rpcDebugFlag = 131; for (int i = 1; i < argc; ++i) { @@ -187,7 +190,7 @@ int main(int argc, char *argv[]) { rpcInit.connType = TAOS_CONN_SERVER; initLogEnv(); - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); void *pRpc = rpcOpen(&rpcInit); if (pRpc == NULL) { tError("failed to start RPC server"); diff --git a/tools/shell/src/shellNettest.c b/tools/shell/src/shellNettest.c index 1a6ac3489d..9fe92212ca 100644 --- a/tools/shell/src/shellNettest.c +++ b/tools/shell/src/shellNettest.c @@ -15,6 +15,7 @@ #define _GNU_SOURCE #include "shellInt.h" +#include "tversion.h" static void shellWorkAsClient() { SShellArgs *pArgs = &shell.args; @@ -33,6 +34,7 @@ static void shellWorkAsClient() { rpcInit.user = "_dnd"; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); clientRpc = rpcOpen(&rpcInit); if (clientRpc == NULL) { printf("failed to init net test client since %s\r\n", terrstr()); @@ -123,6 +125,8 @@ static void shellWorkAsServer() { rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 1000; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); + void *serverRpc = rpcOpen(&rpcInit); if (serverRpc == NULL) { printf("failed to init net test server since %s\r\n", terrstr()); From af053ec75bc0c0d65eb318b534ebe4101aef2721 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 5 Jul 2023 11:15:12 +0000 Subject: [PATCH 02/12] add version check in rpc --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 1 + source/libs/transport/src/transCli.c | 1 + source/libs/transport/src/transSvr.c | 3 ++- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index e6c2fe114f..a7a7273bd0 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -77,6 +77,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { int32_t svrVer = 0; taosVersionStrToInt(version, &svrVer); if (0 != taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) { + dError("msg ver: %d, curr ver: %d", pRpc->info.cliVer, svrVer); goto _OVER; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8af4427e22..e2d4983e57 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -391,6 +391,7 @@ void cliHandleResp(SCliConn* conn) { transMsg.info.ahandle = NULL; transMsg.info.traceId = pHead->traceId; transMsg.info.hasEpSet = pHead->hasEpSet; + transMsg.info.cliVer = pHead->compatibilityVer; SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 496295938a..0f165d04b2 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -196,6 +196,7 @@ static bool uvHandleReq(SSvrConn* pConn) { tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn); return false; } + tDebug("head version: %d 2", pHead->version); pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); @@ -411,7 +412,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; pHead->magicNum = htonl(TRANS_MAGIC_NUM); - pHead->compatibilityVer = 0; + pHead->compatibilityVer = ((STrans*)pConn->pTransInst)->compatibilityVer; pHead->version = TRANS_VER; // handle invalid drop_task resp, TD-20098 From d0d8d93c94f68bb1ac58271a38423140de622556 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 6 Jul 2023 01:29:49 +0000 Subject: [PATCH 03/12] add version check in rpc --- source/libs/transport/src/transCli.c | 2 +- source/libs/transport/src/transSvr.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e2d4983e57..227fc63680 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -391,7 +391,7 @@ void cliHandleResp(SCliConn* conn) { transMsg.info.ahandle = NULL; transMsg.info.traceId = pHead->traceId; transMsg.info.hasEpSet = pHead->hasEpSet; - transMsg.info.cliVer = pHead->compatibilityVer; + transMsg.info.cliVer = htonl(pHead->compatibilityVer); SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 0f165d04b2..f23e176c79 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -412,7 +412,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; pHead->magicNum = htonl(TRANS_MAGIC_NUM); - pHead->compatibilityVer = ((STrans*)pConn->pTransInst)->compatibilityVer; + pHead->compatibilityVer = htonl(((STrans*)pConn->pTransInst)->compatibilityVer); pHead->version = TRANS_VER; // handle invalid drop_task resp, TD-20098 From a630d1284ca79f33d3043fe7ea56ecd37dadc13f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 6 Jul 2023 02:03:31 +0000 Subject: [PATCH 04/12] add version check in rpc --- source/dnode/mgmt/test/sut/src/client.cpp | 2 ++ source/libs/transport/test/transUT.cpp | 8 ++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/test/sut/src/client.cpp b/source/dnode/mgmt/test/sut/src/client.cpp index a27a511651..95eea2359d 100644 --- a/source/dnode/mgmt/test/sut/src/client.cpp +++ b/source/dnode/mgmt/test/sut/src/client.cpp @@ -16,6 +16,7 @@ #include "sut.h" #include "tdatablock.h" #include "tmisce.h" +#include "tversion.h" static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) { TestClient* client = (TestClient*)parent; @@ -53,6 +54,7 @@ void TestClient::DoInit() { rpcInit.parent = this; // rpcInit.secret = (char*)secretEncrypt; // rpcInit.spi = 1; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); clientRpc = rpcOpen(&rpcInit); ASSERT(clientRpc); diff --git a/source/libs/transport/test/transUT.cpp b/source/libs/transport/test/transUT.cpp index 88a1e2564f..2fa94c358f 100644 --- a/source/libs/transport/test/transUT.cpp +++ b/source/libs/transport/test/transUT.cpp @@ -18,10 +18,10 @@ #include "tdatablock.h" #include "tglobal.h" #include "tlog.h" +#include "tmisce.h" #include "transLog.h" #include "trpc.h" -#include "tmisce.h" - +#include "tversion.h" using namespace std; const char *label = "APP"; @@ -54,6 +54,8 @@ class Client { rpcInit_.user = (char *)user; rpcInit_.parent = this; rpcInit_.connType = TAOS_CONN_CLIENT; + + taosVersionStrToInt(version, &(rpcInit_.compatibilityVer)); this->transCli = rpcOpen(&rpcInit_); tsem_init(&this->sem, 0, 0); } @@ -66,6 +68,7 @@ class Client { void Restart(CB cb) { rpcClose(this->transCli); rpcInit_.cfp = cb; + taosVersionStrToInt(version, &(rpcInit_.compatibilityVer)); this->transCli = rpcOpen(&rpcInit_); } void Stop() { @@ -117,6 +120,7 @@ class Server { rpcInit_.cfp = processReq; rpcInit_.user = (char *)user; rpcInit_.connType = TAOS_CONN_SERVER; + taosVersionStrToInt(version, &(rpcInit_.compatibilityVer)); } void Start() { this->transSrv = rpcOpen(&this->rpcInit_); From d061c54a180e4d9b239c606c0eeb63942c839b5c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 6 Jul 2023 06:41:06 +0000 Subject: [PATCH 05/12] add ver check --- source/libs/transport/inc/transportInt.h | 2 +- source/libs/transport/src/transCli.c | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 9ef2373b3f..ca48da690b 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -49,7 +49,7 @@ typedef struct { int32_t compatibilityVer; int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int8_t encryption; // encrypt or not - + int32_t retryMinInterval; // retry init interval int32_t retryStepFactor; // retry interval factor int32_t retryMaxInterval; // retry max interval diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 227fc63680..8062a0618b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -489,6 +489,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; transMsg.info.ahandle = NULL; + transMsg.info.cliVer = pTransInst->compatibilityVer; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); @@ -1350,6 +1351,7 @@ static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) { transMsg.info.ahandle = pMsg->ctx->ahandle; transMsg.info.traceId = pMsg->msg.info.traceId; transMsg.info.hasEpSet = false; + transMsg.info.cliVer = pTransInst->compatibilityVer; if (pCtx->pSem != NULL) { if (pCtx->pRsp == NULL) { } else { @@ -1531,6 +1533,9 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { // persist conn already release by server STransMsg resp; cliBuildExceptResp(pMsg, &resp); + // refactorr later + resp.info.cliVer = pTransInst->compatibilityVer; + if (pMsg->type != Release) { pTransInst->cfp(pTransInst->parent, &resp, NULL); } @@ -1840,6 +1845,7 @@ void cliIteraConnMsgs(SCliConn* conn) { if (-1 == cliBuildExceptResp(cmsg, &resp)) { continue; } + resp.info.cliVer = pTransInst->compatibilityVer; pTransInst->cfp(pTransInst->parent, &resp, NULL); cmsg->ctx->ahandle = NULL; From 5e156aa0a9b0eaf153ec2bf0ca207e498529a76b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 6 Jul 2023 06:43:07 +0000 Subject: [PATCH 06/12] add ver check --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index a7a7273bd0..5d6d16ccf8 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -77,7 +77,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { int32_t svrVer = 0; taosVersionStrToInt(version, &svrVer); if (0 != taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) { - dError("msg ver: %d, curr ver: %d", pRpc->info.cliVer, svrVer); + dError("Version not compatible, cli ver: %d, svr ver: %d", pRpc->info.cliVer, svrVer); goto _OVER; } From 196f2b8f65f8676ae8d96125c24a6f996edb42b1 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Thu, 6 Jul 2023 16:45:53 +0800 Subject: [PATCH 07/12] test:comment connection of new client and old server --- tests/system-test/0-others/compatibility.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/system-test/0-others/compatibility.py b/tests/system-test/0-others/compatibility.py index 22e319fdaf..b8623c3bcd 100644 --- a/tests/system-test/0-others/compatibility.py +++ b/tests/system-test/0-others/compatibility.py @@ -151,9 +151,10 @@ class TDTestCase: os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '") os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql") - cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;" - if os.system(cmd) == 0: - raise Exception("failed to execute system command. cmd: %s" % cmd) + # cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;" + # tdLog.info(f"new client version connect to old version taosd, commad return value:{cmd}") + # if os.system(cmd) == 0: + # raise Exception("failed to execute system command. cmd: %s" % cmd) os.system("pkill taosd") # make sure all the data are saved in disk. self.checkProcessPid("taosd") From 09222801c490e1570919d42bc0a6b2a6ff015b50 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 6 Jul 2023 17:52:48 +0800 Subject: [PATCH 08/12] fix(coverity): fix coverity scan issues --- source/common/src/trow.c | 2 +- source/dnode/vnode/src/meta/metaTable.c | 7 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 115 +++++++----------------- source/libs/tdb/src/db/tdbBtree.c | 10 +-- source/libs/tdb/src/db/tdbDb.c | 5 +- source/libs/tdb/src/db/tdbPager.c | 1 + 6 files changed, 48 insertions(+), 92 deletions(-) diff --git a/source/common/src/trow.c b/source/common/src/trow.c index 8ae77bcd0a..039f436505 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -423,7 +423,7 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) { val = (const void *)&pColVal->value.val; } } else { - pColVal = NULL; + // pColVal = NULL; valType = TD_VTYPE_NONE; } diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index eb2d2e267b..32b63fa950 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -207,7 +207,10 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { tb_uid_t uid = *(tb_uid_t *)pData; tdbFree(pData); SMetaInfo info; - metaGetInfo(pMeta, uid, &info, NULL); + if (metaGetInfo(pMeta, uid, &info, NULL) == TSDB_CODE_NOT_FOUND) { + terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; + return -1; + } if (info.uid == info.suid) { return 0; } else { @@ -939,7 +942,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) { return 0; } - metaInfo("ttl find expired table count: %zu" , TARRAY_SIZE(tbUids)); + metaInfo("ttl find expired table count: %zu", TARRAY_SIZE(tbUids)); metaDropTables(pMeta, tbUids); return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 31b13b8411..f4e888919e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1031,7 +1031,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache return code; } - +/* int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { int32_t code = 0; SLRUCache *pCache = pTsdb->lruCache; @@ -1079,7 +1079,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR return code; } - +*/ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t code = 0; // fetch schema @@ -1829,10 +1829,11 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa } *pIgnoreEarlierTs = false; + /* if (!hasVal) { state->state = SFSLASTNEXTROW_FILESET; } - + */ if (!state->checkRemainingRow) { state->checkRemainingRow = true; } @@ -2020,10 +2021,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk); if (block.maxKey.ts <= state->lastTs) { *pIgnoreEarlierTs = true; - if (state->pBlockData) { - tBlockDataDestroy(state->pBlockData); - state->pBlockData = NULL; - } + + tBlockDataDestroy(state->pBlockData); + state->pBlockData = NULL; *ppRow = NULL; return code; @@ -3176,97 +3176,46 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, TSKEY rowTs = TSDBROW_TS(pRow); - if (lastRowTs == TSKEY_MAX) { - lastRowTs = rowTs; + lastRowTs = rowTs; - for (int16_t iCol = noneCol; iCol < nCols; ++iCol) { - if (iCol >= nLastCol) { - break; - } - SLastCol *pCol = taosArrayGet(pColArray, iCol); - if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) { - continue; - } - if (slotIds[iCol] == 0) { - STColumn *pTColumn = &pTSchema->columns[0]; - - *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = rowTs}); - taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal}); - continue; - } - tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); - - *pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal}; - if (IS_VAR_DATA_TYPE(pColVal->type) /*&& pColVal->value.nData > 0*/) { - pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); - if (pCol->colVal.value.pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - if (pColVal->value.nData > 0) { - memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); - } - } - - /*if (COL_VAL_IS_NONE(pColVal)) { - if (!setNoneCol) { - noneCol = iCol; - setNoneCol = true; - } - } else {*/ - int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ); - if (aColIndex >= 0) { - taosArrayRemove(aColArray, aColIndex); - } - //} - } - if (!setNoneCol) { - // done, goto return pColArray - break; - } else { - continue; - } - } - - // merge into pColArray - setNoneCol = false; for (int16_t iCol = noneCol; iCol < nCols; ++iCol) { if (iCol >= nLastCol) { break; } - // high version's column value - SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol); - if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) { + SLastCol *pCol = taosArrayGet(pColArray, iCol); + if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) { continue; } - SColVal *tColVal = &lastColVal->colVal; + if (slotIds[iCol] == 0) { + STColumn *pTColumn = &pTSchema->columns[0]; + *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = rowTs}); + taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal}); + continue; + } tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); - if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { - SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal}; - if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { - SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); - taosMemoryFree(pLastCol->colVal.value.pData); - lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); - if (lastCol.colVal.value.pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + *pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal}; + if (IS_VAR_DATA_TYPE(pColVal->type) /*&& pColVal->value.nData > 0*/) { + pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); + if (pCol->colVal.value.pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; } + if (pColVal->value.nData > 0) { + memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } + } - taosArraySet(pColArray, iCol, &lastCol); - int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ); + int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ); + if (aColIndex >= 0) { taosArrayRemove(aColArray, aColIndex); - } else if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal) && !setNoneCol) { - noneCol = iCol; - setNoneCol = true; } } - } while (setNoneCol); + + break; + } while (1); if (!hasRow) { if (ignoreEarlierTs) { diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index c49b5726b6..db05f3e287 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -345,7 +345,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL } *ppKey = pTKey; *pkLen = cd.kLen; - memcpy(*ppKey, cd.pKey, cd.kLen); + memcpy(*ppKey, cd.pKey, (size_t)cd.kLen); } if (ppVal) { @@ -357,7 +357,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL } *ppVal = pTVal; *vLen = cd.vLen; - memcpy(*ppVal, cd.pVal, cd.vLen); + memcpy(*ppVal, cd.pVal, (size_t)cd.vLen); } if (TDB_CELLDECODER_FREE_KEY(&cd)) { @@ -1793,7 +1793,7 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { *ppKey = pKey; *kLen = cd.kLen; - memcpy(pKey, cd.pKey, cd.kLen); + memcpy(pKey, cd.pKey, (size_t)cd.kLen); if (ppVal) { if (cd.vLen > 0) { @@ -1852,7 +1852,7 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { *ppKey = pKey; *kLen = cd.kLen; - memcpy(pKey, cd.pKey, cd.kLen); + memcpy(pKey, cd.pKey, (size_t)cd.kLen); if (ppVal) { // TODO: vLen may be zero @@ -1864,7 +1864,7 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { *ppVal = pVal; *vLen = cd.vLen; - memcpy(pVal, cd.pVal, cd.vLen); + memcpy(pVal, cd.pVal, (size_t)cd.vLen); } ret = tdbBtcMoveToPrev(pBtc); diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index 952c49db73..0e9f192226 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -62,7 +62,10 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, i } memset(pDb->pgrHash, 0, tsize); - taosMulModeMkDir(dbname, 0755); + ret = taosMulModeMkDir(dbname, 0755); + if (ret < 0) { + return -1; + } #ifdef USE_MAINDB // open main db diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 5ea9be63db..5144a94a25 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -980,6 +980,7 @@ int tdbPagerRestoreJournals(SPager *pPager) { jname[dirLen] = '/'; sprintf(jname + dirLen + 1, TDB_MAINDB_NAME "-journal.%" PRId64, *pTxnId); if (tdbPagerRestore(pPager, jname) < 0) { + taosArrayDestroy(pTxnList); tdbCloseDir(&pDir); tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), jname); From 3bd99776d28bf7d537a44aa2453ffab6ccda88a5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 6 Jul 2023 18:03:15 +0800 Subject: [PATCH 09/12] add rpc version check --- source/libs/transport/inc/transComm.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index f5570f6afd..3b304e2c77 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -167,8 +167,8 @@ typedef struct { uint64_t timestamp; char user[TSDB_UNI_LEN]; + int32_t compatibilityVer; uint32_t magicNum; - uint32_t compatibilityVer; STraceId traceId; uint64_t ahandle; // ahandle assigned by client uint32_t code; // del later From 5964e7819c548faf9382b12e23de8142d32ac6fd Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 6 Jul 2023 19:38:37 +0800 Subject: [PATCH 10/12] test: add check --- tests/system-test/7-tmq/tmqDropConsumer.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/system-test/7-tmq/tmqDropConsumer.py b/tests/system-test/7-tmq/tmqDropConsumer.py index 336285a39e..06ce4c0fd7 100644 --- a/tests/system-test/7-tmq/tmqDropConsumer.py +++ b/tests/system-test/7-tmq/tmqDropConsumer.py @@ -256,6 +256,7 @@ class TDTestCase: tdLog.info("all consumers status into 'lost'") # drop consumer groups + tdLog.info("drop all consumers") for i in range(len(groupIdList)): for j in range(len(topicNameList)): sqlCmd = f"drop consumer group `%s` on %s"%(groupIdList[i], topicNameList[j]) @@ -265,7 +266,17 @@ class TDTestCase: tmqCom.g_end_insert_flag = 1 tdLog.debug("notify sub-thread to stop insert data") pThread.join() + + tdSql.query('show consumers;') + consumerNUm = tdSql.queryRows + tdSql.query('show subscriptions;') + subscribeNum = tdSql.queryRows + + if (0 != consumerNUm or 0 != subscribeNum): + tdLog.exit("drop consumer fail! consumerNUm %d, subscribeNum: %d"%(consumerNUm, subscribeNum)) + + tdLog.info("drop consuer success, there is no consumers and subscribes") tdLog.printNoPrefix("======== test case 1 end ...... ") def run(self): From e7ced3e9ccf216fd05611fa38954dcb370d2248a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 7 Jul 2023 09:04:44 +0800 Subject: [PATCH 11/12] fix: desc table without permission to view any column issue --- source/libs/command/src/command.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index f59653700b..b004539e16 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -87,7 +87,7 @@ static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) { return code; } -static void setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock, int32_t numOfRows, STableMeta* pMeta) { +static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock, int32_t numOfRows, STableMeta* pMeta) { blockDataEnsureCapacity(pBlock, numOfRows); pBlock->info.rows = 0; @@ -114,6 +114,11 @@ static void setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock, in colDataSetVal(pCol4, pBlock->info.rows, buf, false); ++(pBlock->info.rows); } + if (pBlock->info.rows <= 0) { + qError("no permission to view any columns"); + return TSDB_CODE_PAR_PERMISSION_DENIED; + } + return TSDB_CODE_SUCCESS; } static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp** pRsp) { @@ -123,7 +128,7 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp** SSDataBlock* pBlock = NULL; int32_t code = buildDescResultDataBlock(&pBlock); if (TSDB_CODE_SUCCESS == code) { - setDescResultIntoDataBlock(sysInfoUser, pBlock, numOfRows, pDesc->pMeta); + code = setDescResultIntoDataBlock(sysInfoUser, pBlock, numOfRows, pDesc->pMeta); } if (TSDB_CODE_SUCCESS == code) { code = buildRetrieveTableRsp(pBlock, DESCRIBE_RESULT_COLS, pRsp); From 9c38e7c1b17fbcfde4d99f731cccfcdb77443d89 Mon Sep 17 00:00:00 2001 From: huolibo Date: Fri, 7 Jul 2023 09:43:36 +0800 Subject: [PATCH 12/12] docs(driver): jdbc 3.2.4 description --- docs/en/07-develop/07-tmq.mdx | 4 ---- docs/en/14-reference/03-connector/04-java.mdx | 7 ++++--- docs/zh/07-develop/07-tmq.mdx | 4 ---- docs/zh/08-connector/14-java.mdx | 1 + 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx index f5e0378a00..506a8dcc46 100644 --- a/docs/en/07-develop/07-tmq.mdx +++ b/docs/en/07-develop/07-tmq.mdx @@ -81,10 +81,6 @@ Set subscription() throws SQLException; ConsumerRecords poll(Duration timeout) throws SQLException; -void commitAsync(); - -void commitAsync(OffsetCommitCallback callback); - void commitSync() throws SQLException; void close() throws SQLException; diff --git a/docs/en/14-reference/03-connector/04-java.mdx b/docs/en/14-reference/03-connector/04-java.mdx index e8c407b125..b68aeda94c 100644 --- a/docs/en/14-reference/03-connector/04-java.mdx +++ b/docs/en/14-reference/03-connector/04-java.mdx @@ -36,15 +36,16 @@ REST connection supports all platforms that can run Java. | taos-jdbcdriver version | major changes | TDengine version | | :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: | +| 3.2.4 | Subscription add the enable.auto.commit parameter and the unsubscribe() method in the WebSocket connection | 3.0.5.0 or later | | 3.2.3 | Fixed resultSet data parsing failure in some cases | 3.0.5.0 or later | -| 3.2.2 | subscription add seek function | 3.0.5.0 or later | +| 3.2.2 | Subscription add seek function | 3.0.5.0 or later | | 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | 3.0.3.0 or later | | 3.2.0 | This version has been deprecated | - | | 3.1.0 | JDBC REST connection supports subscription over WebSocket | - | | 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment | - | | 3.0.0 | Support for TDengine 3.0 | 3.0.0.0 or later | -| 2.0.42 | fix wasNull interface return value in WebSocket connection | - | -| 2.0.41 | fix decode method of username and password in REST connection | - | +| 2.0.42 | Fix wasNull interface return value in WebSocket connection | - | +| 2.0.41 | Fix decode method of username and password in REST connection | - | | 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters | - | | 2.0.38 | JDBC REST connections add bulk pull function | - | | 2.0.37 | Support json tags | - | diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index 54a8af2287..38b91d7cea 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -81,10 +81,6 @@ Set subscription() throws SQLException; ConsumerRecords poll(Duration timeout) throws SQLException; -void commitAsync(); - -void commitAsync(OffsetCommitCallback callback); - void commitSync() throws SQLException; void close() throws SQLException; diff --git a/docs/zh/08-connector/14-java.mdx b/docs/zh/08-connector/14-java.mdx index c7da2bd4f5..96f8991eea 100644 --- a/docs/zh/08-connector/14-java.mdx +++ b/docs/zh/08-connector/14-java.mdx @@ -36,6 +36,7 @@ REST 连接支持所有能运行 Java 的平台。 | taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 | | :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: | +| 3.2.4 | 数据订阅在 WebSocket 连接下增加 enable.auto.commit 参数,以及 unsubscribe() 方法。 | - | | 3.2.3 | 修复 ResultSet 在一些情况数据解析失败 | - | | 3.2.2 | 新增功能:数据订阅支持 seek 功能。 | 3.0.5.0 及更高版本 | | 3.2.1 | 新增功能:WebSocket 连接支持 schemaless 与 prepareStatement 写入。变更:consumer poll 返回结果集为 ConsumerRecord,可通过 value() 获取指定结果集数据。 | 3.0.3.0 及更高版本 |