From bdfeb329230f95708c1246d9cecf14b2e3adb3cd Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 5 Jul 2023 16:04:34 +0800 Subject: [PATCH 01/24] fix:bugs in tmq_get_topic_assignment --- source/client/src/clientTmq.c | 52 ++++++++++++++++++---------------- source/dnode/vnode/src/tq/tq.c | 10 ++----- 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 83550aa15d..accaa8fa1d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2583,6 +2583,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->end = pClientVg->offsetInfo.walVerEnd; pAssignment->vgId = pClientVg->vgId; + tscInfo("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, + pAssignment->vgId, pAssignment->currentOffset); } if (needFetch) { @@ -2673,34 +2675,36 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a int32_t num = taosArrayGetSize(pCommon->pList); for(int32_t i = 0; i < num; ++i) { (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i); + tscInfo("consumer:0x%" PRIx64 " get assignment from server:%d->%" PRId64, tmq->consumerId, + (*assignment)[i].vgId, (*assignment)[i].currentOffset); } *numOfAssignment = num; } - for (int32_t j = 0; j < (*numOfAssignment); ++j) { - tmq_topic_assignment* p = &(*assignment)[j]; - - for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) { - SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - if (pClientVg->vgId != p->vgId) { - continue; - } - - SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; - - pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; - - char offsetBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); - - tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf); - - pOffsetInfo->walVerBegin = p->begin; - pOffsetInfo->walVerEnd = p->end; - pOffsetInfo->currentOffset.version = p->currentOffset; - pOffsetInfo->committedOffset.version = p->currentOffset; - } - } +// for (int32_t j = 0; j < (*numOfAssignment); ++j) { +// tmq_topic_assignment* p = &(*assignment)[j]; +// +// for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) { +// SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); +// if (pClientVg->vgId != p->vgId) { +// continue; +// } +// +// SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; +// +// pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; +// +// char offsetBuf[80] = {0}; +// tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); +// +// tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf); +// +// pOffsetInfo->walVerBegin = p->begin; +// pOffsetInfo->walVerEnd = p->end; +// pOffsetInfo->currentOffset.version = p->currentOffset; +// pOffsetInfo->committedOffset.version = p->currentOffset; +// } +// } destroyCommonInfo(pCommon); return code; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a293858207..0989b980ea 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -571,6 +571,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { dataRsp.rspOffset.type = TMQ_OFFSET__LOG; dataRsp.rspOffset.version = pOffset->val.version; + tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from stroe:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); } else { if (req.useSnapshot == true) { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey); @@ -581,14 +582,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { dataRsp.rspOffset.type = TMQ_OFFSET__LOG; - if (reqOffset.type == TMQ_OFFSET__LOG) { - int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader); - if (currentVer == -1) { // not start to read data from wal yet, return req offset directly - dataRsp.rspOffset.version = reqOffset.version; - } else { - dataRsp.rspOffset.version = currentVer; // return current consume offset value - } - } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { + if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { dataRsp.rspOffset.version = ever; From 79f01ad65578be2694f1309b80fc98b4b7cb80ec Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 5 Jul 2023 08:16:25 +0000 Subject: [PATCH 02/24] add version check in rpc --- include/libs/transport/trpc.h | 2 + source/client/src/clientEnv.c | 45 ++++++++-------- source/client/src/clientImpl.c | 36 ++++++------- source/client/src/clientMain.c | 54 +++++++++---------- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 10 +++- source/libs/function/src/udfd.c | 3 +- .../libs/sync/test/sync_test_lib/src/syncIO.c | 26 ++++----- source/libs/transport/inc/transComm.h | 2 + source/libs/transport/inc/transportInt.h | 6 +-- source/libs/transport/src/trans.c | 1 + source/libs/transport/src/transCli.c | 11 ++-- source/libs/transport/src/transComm.c | 2 +- source/libs/transport/src/transSvr.c | 11 ++-- source/libs/transport/test/cliBench.c | 3 +- source/libs/transport/test/svrBench.c | 7 ++- tools/shell/src/shellNettest.c | 4 ++ 16 files changed, 123 insertions(+), 100 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index c73e5c127a..93e4d72ad7 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -46,6 +46,7 @@ typedef struct SRpcHandleInfo { int8_t noResp; // has response or not(default 0, 0: resp, 1: no resp) int8_t persistHandle; // persist handle or not int8_t hasEpSet; + int32_t cliVer; // app info void *ahandle; // app handle set by client @@ -83,6 +84,7 @@ typedef struct SRpcInit { int32_t sessions; // number of sessions allowed int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int32_t idleTime; // milliseconds, 0 means idle timer is disabled + int32_t compatibilityVer; int32_t retryMinInterval; // retry init interval int32_t retryStepFactor; // retry interval factor diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index c64bbfbdb6..238b3613f5 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -29,6 +29,7 @@ #include "trpc.h" #include "tsched.h" #include "ttime.h" +#include "tversion.h" #if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL) #include "cus_name.h" @@ -111,7 +112,8 @@ static void deregisterRequest(SRequestObj *pRequest) { atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1); if (tsSlowLogScope & reqType) { taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 ", Duration:%" PRId64 "us, SQL:%s", - taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration, pRequest->sqlstr); + taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration, + pRequest->sqlstr); } } @@ -175,6 +177,8 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.connLimitNum = connLimitNum; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); + void *pDnodeConn = rpcOpen(&rpcInit); if (pDnodeConn == NULL) { tscError("failed to init connection to server"); @@ -358,17 +362,16 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); } - void destroySubRequests(SRequestObj *pRequest) { - int32_t reqIdx = -1; + int32_t reqIdx = -1; SRequestObj *pReqList[16] = {NULL}; - uint64_t tmpRefId = 0; + uint64_t tmpRefId = 0; if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) { return; } - - SRequestObj* pTmp = pRequest; + + SRequestObj *pTmp = pRequest; while (pTmp->relation.prevRefId) { tmpRefId = pTmp->relation.prevRefId; pTmp = acquireRequest(tmpRefId); @@ -376,9 +379,9 @@ void destroySubRequests(SRequestObj *pRequest) { pReqList[++reqIdx] = pTmp; releaseRequest(tmpRefId); } else { - tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, - tmpRefId, pTmp->requestId); - break; + tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId, + pTmp->requestId); + break; } } @@ -391,16 +394,15 @@ void destroySubRequests(SRequestObj *pRequest) { pTmp = acquireRequest(tmpRefId); if (pTmp) { tmpRefId = pTmp->relation.nextRefId; - removeRequest(pTmp->self); + removeRequest(pTmp->self); releaseRequest(pTmp->self); } else { tscError("0x%" PRIx64 " is not there", tmpRefId); - break; + break; } } } - void doDestroyRequest(void *p) { if (NULL == p) { return; @@ -412,7 +414,7 @@ void doDestroyRequest(void *p) { tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); destroySubRequests(pRequest); - + taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); schedulerFreeJob(&pRequest->body.queryJob, 0); @@ -473,15 +475,15 @@ void taosStopQueryImpl(SRequestObj *pRequest) { } void stopAllQueries(SRequestObj *pRequest) { - int32_t reqIdx = -1; + int32_t reqIdx = -1; SRequestObj *pReqList[16] = {NULL}; - uint64_t tmpRefId = 0; + uint64_t tmpRefId = 0; if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) { return; } - - SRequestObj* pTmp = pRequest; + + SRequestObj *pTmp = pRequest; while (pTmp->relation.prevRefId) { tmpRefId = pTmp->relation.prevRefId; pTmp = acquireRequest(tmpRefId); @@ -489,9 +491,9 @@ void stopAllQueries(SRequestObj *pRequest) { pReqList[++reqIdx] = pTmp; releaseRequest(tmpRefId); } else { - tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, - tmpRefId, pTmp->requestId); - break; + tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId, + pTmp->requestId); + break; } } @@ -510,12 +512,11 @@ void stopAllQueries(SRequestObj *pRequest) { releaseRequest(pTmp->self); } else { tscError("0x%" PRIx64 " is not there", tmpRefId); - break; + break; } } } - void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); } static void *tscCrashReportThreadFp(void *param) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 955c90fc81..14d6394fc4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -26,7 +26,7 @@ #include "tpagedbuf.h" #include "tref.h" #include "tsched.h" - +#include "tversion.h" static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); @@ -237,8 +237,9 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, return TSDB_CODE_SUCCESS; } -int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest) { - int32_t code = buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0); +int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) { + int32_t code = + buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0); if (TSDB_CODE_SUCCESS == code) { pRequest->relation.prevRefId = (*pNewRequest)->self; (*pNewRequest)->relation.nextRefId = pRequest->self; @@ -502,8 +503,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t pResInfo->userFields[i].bytes = pSchema[i].bytes; pResInfo->userFields[i].type = pSchema[i].type; - if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR || - pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) { + if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR || pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) { pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE; } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) { pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; @@ -891,7 +891,7 @@ static bool incompletaFileParsing(SNode* pStmt) { void continuePostSubQuery(SRequestObj* pRequest, TAOS_ROW row) { SSqlCallbackWrapper* pWrapper = pRequest->pWrapper; - int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId); + int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId); if (TSDB_CODE_SUCCESS == code) { int64_t analyseStart = taosGetTimestampUs(); code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, (void**)row); @@ -934,7 +934,7 @@ void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) { TAOS_ROW row = NULL; if (rowNum > 0) { - row = taos_fetch_row(res); // for single row only now + row = taos_fetch_row(res); // for single row only now } SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId); @@ -2135,6 +2135,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); clientRpc = rpcOpen(&rpcInit); if (clientRpc == NULL) { @@ -2494,11 +2495,10 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, return pRequest; } +static void fetchCallback(void* pResult, void* param, int32_t code) { + SRequestObj* pRequest = (SRequestObj*)param; -static void fetchCallback(void *pResult, void *param, int32_t code) { - SRequestObj *pRequest = (SRequestObj *)param; - - SReqResultInfo *pResultInfo = &pRequest->body.resInfo; + SReqResultInfo* pResultInfo = &pRequest->body.resInfo; tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); @@ -2520,7 +2520,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) { } pRequest->code = - setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, true); + setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, true); if (pRequest->code != TSDB_CODE_SUCCESS) { pResultInfo->numOfRows = 0; pRequest->code = code; @@ -2531,19 +2531,19 @@ static void fetchCallback(void *pResult, void *param, int32_t code) { pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId); - STscObj *pTscObj = pRequest->pTscObj; - SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; - atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); + STscObj* pTscObj = pRequest->pTscObj; + SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary; + atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); } pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); } -void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param) { +void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) { pRequest->body.fetchFp = fp; pRequest->body.param = param; - SReqResultInfo *pResultInfo = &pRequest->body.resInfo; + SReqResultInfo* pResultInfo = &pRequest->body.resInfo; // this query has no results or error exists, return directly if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) { @@ -2578,5 +2578,3 @@ void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param schedulerFetchRows(pRequest->body.queryJob, &req); } - - diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 7ce838cd2c..e262ee04b9 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -558,13 +558,12 @@ int taos_select_db(TAOS *taos, const char *db) { return code; } - void taos_stop_query(TAOS_RES *res) { if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { return; } - stopAllQueries((SRequestObj*)res); + stopAllQueries((SRequestObj *)res); } bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { @@ -785,7 +784,7 @@ void destorySqlCallbackWrapper(SSqlCallbackWrapper *pWrapper) { taosMemoryFree(pWrapper); } -void destroyCtxInRequest(SRequestObj* pRequest) { +void destroyCtxInRequest(SRequestObj *pRequest) { schedulerFreeJob(&pRequest->body.queryJob, 0); qDestroyQuery(pRequest->pQuery); pRequest->pQuery = NULL; @@ -793,7 +792,6 @@ void destroyCtxInRequest(SRequestObj* pRequest) { pRequest->pWrapper = NULL; } - static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t code) { SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param; SRequestObj *pRequest = pWrapper->pRequest; @@ -807,15 +805,15 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t if (TSDB_CODE_SUCCESS == code) { code = qAnalyseSqlSemantic(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery); } - + pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart; - + handleQueryAnslyseRes(pWrapper, pResultMeta, code); } -int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) { - int32_t code = TSDB_CODE_SUCCESS; - SCatalogReq* pTarget = taosMemoryCalloc(1, sizeof(SCatalogReq)); +int32_t cloneCatalogReq(SCatalogReq **ppTarget, SCatalogReq *pSrc) { + int32_t code = TSDB_CODE_SUCCESS; + SCatalogReq *pTarget = taosMemoryCalloc(1, sizeof(SCatalogReq)); if (pTarget == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; } else { @@ -842,17 +840,16 @@ int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) { return code; } - -void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode* pRoot) { - SRequestObj* pNewRequest = NULL; - SSqlCallbackWrapper* pNewWrapper = NULL; - int32_t code = buildPreviousRequest(pWrapper->pRequest, pWrapper->pRequest->sqlstr, &pNewRequest); +void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode *pRoot) { + SRequestObj *pNewRequest = NULL; + SSqlCallbackWrapper *pNewWrapper = NULL; + int32_t code = buildPreviousRequest(pWrapper->pRequest, pWrapper->pRequest->sqlstr, &pNewRequest); if (code) { handleQueryAnslyseRes(pWrapper, pResultMeta, code); return; } - pNewRequest->pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); + pNewRequest->pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY); if (NULL == pNewRequest->pQuery) { code = TSDB_CODE_OUT_OF_MEMORY; } else { @@ -871,16 +868,16 @@ void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResult } void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code) { - SRequestObj *pRequest = pWrapper->pRequest; - SQuery *pQuery = pRequest->pQuery; + SRequestObj *pRequest = pWrapper->pRequest; + SQuery *pQuery = pRequest->pQuery; if (code == TSDB_CODE_SUCCESS && pQuery->pPrevRoot) { - SNode* prevRoot = pQuery->pPrevRoot; + SNode *prevRoot = pQuery->pPrevRoot; pQuery->pPrevRoot = NULL; handleSubQueryFromAnalyse(pWrapper, pResultMeta, prevRoot); return; } - + if (code == TSDB_CODE_SUCCESS) { pRequest->stableQuery = pQuery->stableQuery; if (pQuery->pRoot) { @@ -1043,7 +1040,7 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { } int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; STscObj *pTscObj = pRequest->pTscObj; SSqlCallbackWrapper *pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper)); if (pWrapper == NULL) { @@ -1081,7 +1078,6 @@ int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *p return code; } - void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { SSqlCallbackWrapper *pWrapper = NULL; int32_t code = TSDB_CODE_SUCCESS; @@ -1128,12 +1124,12 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { } void restartAsyncQuery(SRequestObj *pRequest, int32_t code) { - int32_t reqIdx = 0; + int32_t reqIdx = 0; SRequestObj *pReqList[16] = {NULL}; SRequestObj *pUserReq = NULL; pReqList[0] = pRequest; - uint64_t tmpRefId = 0; - SRequestObj* pTmp = pRequest; + uint64_t tmpRefId = 0; + SRequestObj *pTmp = pRequest; while (pTmp->relation.prevRefId) { tmpRefId = pTmp->relation.prevRefId; pTmp = acquireRequest(tmpRefId); @@ -1141,9 +1137,9 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) { pReqList[++reqIdx] = pTmp; releaseRequest(tmpRefId); } else { - tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, - tmpRefId, pTmp->requestId); - break; + tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId, + pTmp->requestId); + break; } } @@ -1152,11 +1148,11 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) { pTmp = acquireRequest(tmpRefId); if (pTmp) { tmpRefId = pTmp->relation.nextRefId; - removeRequest(pTmp->self); + removeRequest(pTmp->self); releaseRequest(pTmp->self); } else { tscError("0x%" PRIx64 " is not there", tmpRefId); - break; + break; } } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index ea46b70693..e6c2fe114f 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" #include "qworker.h" +#include "tversion.h" static inline void dmSendRsp(SRpcMsg *pMsg) { rpcSendResponse(pMsg); } @@ -73,6 +74,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType), pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId); + int32_t svrVer = 0; + taosVersionStrToInt(version, &svrVer); + if (0 != taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) { + goto _OVER; + } + switch (pRpc->msgType) { case TDMT_DND_NET_TEST: dmProcessNetTestReq(pDnode, pRpc); @@ -305,6 +312,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.supportBatch = 1; rpcInit.batchSize = 8 * 1024; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { @@ -339,7 +347,7 @@ int32_t dmInitServer(SDnode *pDnode) { rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.parent = pDnode; rpcInit.compressSize = tsCompressMsgSize; - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); pTrans->serverRpc = rpcOpen(&rpcInit); if (pTrans->serverRpc == NULL) { dError("failed to init dnode rpc server"); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 3cc7c052cc..7371017111 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -29,6 +29,7 @@ #include "tmsg.h" #include "trpc.h" #include "tmisce.h" +#include "tversion.h" // clang-format on #define UDFD_MAX_SCRIPT_PLUGINS 64 @@ -1038,7 +1039,7 @@ int32_t udfdOpenClientRpc() { connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); global.clientRpc = rpcOpen(&rpcInit); if (global.clientRpc == NULL) { fnError("failed to init dnode rpc client"); diff --git a/source/libs/sync/test/sync_test_lib/src/syncIO.c b/source/libs/sync/test/sync_test_lib/src/syncIO.c index 2e00785586..4f8ae59348 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncIO.c +++ b/source/libs/sync/test/sync_test_lib/src/syncIO.c @@ -21,6 +21,7 @@ #include "tglobal.h" #include "ttimer.h" #include "tutil.h" +#include "tversion.h" bool gRaftDetailLog = false; SSyncIO *gSyncIO = NULL; @@ -188,7 +189,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) { rpcInit.idleTime = 100; rpcInit.user = "sync-io"; rpcInit.connType = TAOS_CONN_CLIENT; - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); io->clientRpc = rpcOpen(&rpcInit); if (io->clientRpc == NULL) { sError("failed to initialize RPC"); @@ -209,7 +210,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) { rpcInit.idleTime = 2 * 1500; rpcInit.parent = io; rpcInit.connType = TAOS_CONN_SERVER; - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); void *pRpc = rpcOpen(&rpcInit); if (pRpc == NULL) { sError("failed to start RPC server"); @@ -470,11 +471,10 @@ static void syncIOTickPing(void *param, void *tmrId) { taosTmrReset(syncIOTickPing, io->pingTimerMS, io, io->timerMgr, &io->pingTimer); } -void syncEntryDestory(SSyncRaftEntry* pEntry) {} +void syncEntryDestory(SSyncRaftEntry *pEntry) {} - -void syncUtilMsgNtoH(void* msg) { - SMsgHead* pHead = msg; +void syncUtilMsgNtoH(void *msg) { + SMsgHead *pHead = msg; pHead->contLen = ntohl(pHead->contLen); pHead->vgId = ntohl(pHead->vgId); } @@ -487,9 +487,9 @@ static inline bool syncUtilCanPrint(char c) { } } -char* syncUtilPrintBin(char* ptr, uint32_t len) { +char *syncUtilPrintBin(char *ptr, uint32_t len) { int64_t memLen = (int64_t)(len + 1); - char* s = taosMemoryMalloc(memLen); + char *s = taosMemoryMalloc(memLen); ASSERT(s != NULL); memset(s, 0, len + 1); memcpy(s, ptr, len); @@ -502,13 +502,13 @@ char* syncUtilPrintBin(char* ptr, uint32_t len) { return s; } -char* syncUtilPrintBin2(char* ptr, uint32_t len) { +char *syncUtilPrintBin2(char *ptr, uint32_t len) { uint32_t len2 = len * 4 + 1; - char* s = taosMemoryMalloc(len2); + char *s = taosMemoryMalloc(len2); ASSERT(s != NULL); memset(s, 0, len2); - char* p = s; + char *p = s; for (int32_t i = 0; i < len; ++i) { int32_t n = sprintf(p, "%d,", ptr[i]); p += n; @@ -516,7 +516,7 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len) { return s; } -void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port) { +void syncUtilU642Addr(uint64_t u64, char *host, int64_t len, uint16_t *port) { uint32_t hostU32 = (uint32_t)((u64 >> 32) & 0x00000000FFFFFFFF); struct in_addr addr = {.s_addr = hostU32}; @@ -524,7 +524,7 @@ void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port) { *port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16); } -uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { +uint64_t syncUtilAddr2U64(const char *host, uint16_t port) { uint32_t hostU32 = taosGetIpv4FromFqdn(host); if (hostU32 == (uint32_t)-1) { sError("failed to resolve ipv4 addr, host:%s", host); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a2c486767f..f5570f6afd 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -154,6 +154,7 @@ typedef struct { #pragma pack(push, 1) +#define TRANS_VER 2 typedef struct { char version : 4; // RPC version char comp : 2; // compression algorithm, 0:no compression 1:lz4 @@ -167,6 +168,7 @@ typedef struct { uint64_t timestamp; char user[TSDB_UNI_LEN]; uint32_t magicNum; + uint32_t compatibilityVer; STraceId traceId; uint64_t ahandle; // ahandle assigned by client uint32_t code; // del later diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 8ea0064d44..9ef2373b3f 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -46,9 +46,9 @@ typedef struct { int8_t connType; char label[TSDB_LABEL_LEN]; char user[TSDB_UNI_LEN]; // meter ID - - int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size - int8_t encryption; // encrypt or not + int32_t compatibilityVer; + int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size + int8_t encryption; // encrypt or not int32_t retryMinInterval; // retry init interval int32_t retryStepFactor; // retry interval factor diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 0771f9198a..08b0451982 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -50,6 +50,7 @@ void* rpcOpen(const SRpcInit* pInit) { } pRpc->encryption = pInit->encryption; + pRpc->compatibilityVer = pInit->compatibilityVer; pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval pRpc->retryStepFactor = pInit->retryStepFactor; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1709fc3cb1..8af4427e22 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -984,11 +984,10 @@ void cliSendBatch(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; - SCliBatch* pBatch = pConn->pBatch; - SCliBatchList* pList = pBatch->pList; - pList->connCnt += 1; + SCliBatch* pBatch = pConn->pBatch; + int32_t wLen = pBatch->wLen; - int32_t wLen = pBatch->wLen; + pBatch->pList->connCnt += 1; uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t)); int i = 0; @@ -1018,6 +1017,8 @@ void cliSendBatch(SCliConn* pConn) { memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); pHead->traceId = pMsg->info.traceId; pHead->magicNum = htonl(TRANS_MAGIC_NUM); + pHead->version = TRANS_VER; + pHead->compatibilityVer = htonl(pTransInst->compatibilityVer); } pHead->timestamp = taosHton64(taosGetTimestampUs()); @@ -1074,6 +1075,8 @@ void cliSend(SCliConn* pConn) { memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); pHead->traceId = pMsg->info.traceId; pHead->magicNum = htonl(TRANS_MAGIC_NUM); + pHead->version = TRANS_VER; + pHead->compatibilityVer = htonl(pTransInst->compatibilityVer); } pHead->timestamp = taosHton64(taosGetTimestampUs()); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 0dfc7677b3..b14db9497e 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -192,7 +192,7 @@ bool transReadComplete(SConnBuffer* connBuf) { memcpy((char*)&head, connBuf->buf, sizeof(head)); int32_t msgLen = (int32_t)htonl(head.msgLen); p->total = msgLen; - p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)); + p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)) || head.version != TRANS_VER; } if (p->total >= p->len) { p->left = p->total - p->len; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index da3b0ad626..496295938a 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -236,8 +236,8 @@ static bool uvHandleReq(SSvrConn* pConn) { if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); if (cost >= EXCEPTION_LIMIT_US) { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", transLabel(pTransInst), - pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost); + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", + transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost); } else { tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost); @@ -245,8 +245,8 @@ static bool uvHandleReq(SSvrConn* pConn) { } else { if (cost >= EXCEPTION_LIMIT_US) { tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception", - transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, - transMsg.code, (int)(cost)); + transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, + transMsg.code, (int)(cost)); } else { tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus", transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, @@ -262,6 +262,7 @@ static bool uvHandleReq(SSvrConn* pConn) { transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId); transMsg.info.refId = pConn->refId; transMsg.info.traceId = pHead->traceId; + transMsg.info.cliVer = htonl(pHead->compatibilityVer); tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn, pConn->refId); @@ -410,6 +411,8 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; pHead->magicNum = htonl(TRANS_MAGIC_NUM); + pHead->compatibilityVer = 0; + pHead->version = TRANS_VER; // handle invalid drop_task resp, TD-20098 if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { diff --git a/source/libs/transport/test/cliBench.c b/source/libs/transport/test/cliBench.c index aaee162cd7..8a5276b814 100644 --- a/source/libs/transport/test/cliBench.c +++ b/source/libs/transport/test/cliBench.c @@ -19,6 +19,7 @@ #include "transLog.h" #include "trpc.h" #include "tutil.h" +#include "tversion.h" typedef struct { int index; @@ -155,7 +156,7 @@ int main(int argc, char *argv[]) { } initLogEnv(); - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); void *pRpc = rpcOpen(&rpcInit); if (pRpc == NULL) { tError("failed to initialize RPC"); diff --git a/source/libs/transport/test/svrBench.c b/source/libs/transport/test/svrBench.c index 4e2395b17b..a3fa81662c 100644 --- a/source/libs/transport/test/svrBench.c +++ b/source/libs/transport/test/svrBench.c @@ -13,12 +13,13 @@ * along with this program. If not, see . */ -//#define _DEFAULT_SOURCE +// #define _DEFAULT_SOURCE #include "os.h" #include "tglobal.h" #include "tqueue.h" #include "transLog.h" #include "trpc.h" +#include "tversion.h" int msgSize = 128; int commit = 0; @@ -151,6 +152,8 @@ int main(int argc, char *argv[]) { rpcInit.numOfThreads = 1; rpcInit.cfp = processRequestMsg; rpcInit.idleTime = 2 * 1500; + + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); rpcDebugFlag = 131; for (int i = 1; i < argc; ++i) { @@ -187,7 +190,7 @@ int main(int argc, char *argv[]) { rpcInit.connType = TAOS_CONN_SERVER; initLogEnv(); - + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); void *pRpc = rpcOpen(&rpcInit); if (pRpc == NULL) { tError("failed to start RPC server"); diff --git a/tools/shell/src/shellNettest.c b/tools/shell/src/shellNettest.c index 1a6ac3489d..9fe92212ca 100644 --- a/tools/shell/src/shellNettest.c +++ b/tools/shell/src/shellNettest.c @@ -15,6 +15,7 @@ #define _GNU_SOURCE #include "shellInt.h" +#include "tversion.h" static void shellWorkAsClient() { SShellArgs *pArgs = &shell.args; @@ -33,6 +34,7 @@ static void shellWorkAsClient() { rpcInit.user = "_dnd"; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); clientRpc = rpcOpen(&rpcInit); if (clientRpc == NULL) { printf("failed to init net test client since %s\r\n", terrstr()); @@ -123,6 +125,8 @@ static void shellWorkAsServer() { rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 1000; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); + void *serverRpc = rpcOpen(&rpcInit); if (serverRpc == NULL) { printf("failed to init net test server since %s\r\n", terrstr()); From af053ec75bc0c0d65eb318b534ebe4101aef2721 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 5 Jul 2023 11:15:12 +0000 Subject: [PATCH 03/24] add version check in rpc --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 1 + source/libs/transport/src/transCli.c | 1 + source/libs/transport/src/transSvr.c | 3 ++- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index e6c2fe114f..a7a7273bd0 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -77,6 +77,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { int32_t svrVer = 0; taosVersionStrToInt(version, &svrVer); if (0 != taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) { + dError("msg ver: %d, curr ver: %d", pRpc->info.cliVer, svrVer); goto _OVER; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8af4427e22..e2d4983e57 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -391,6 +391,7 @@ void cliHandleResp(SCliConn* conn) { transMsg.info.ahandle = NULL; transMsg.info.traceId = pHead->traceId; transMsg.info.hasEpSet = pHead->hasEpSet; + transMsg.info.cliVer = pHead->compatibilityVer; SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 496295938a..0f165d04b2 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -196,6 +196,7 @@ static bool uvHandleReq(SSvrConn* pConn) { tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn); return false; } + tDebug("head version: %d 2", pHead->version); pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); @@ -411,7 +412,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; pHead->magicNum = htonl(TRANS_MAGIC_NUM); - pHead->compatibilityVer = 0; + pHead->compatibilityVer = ((STrans*)pConn->pTransInst)->compatibilityVer; pHead->version = TRANS_VER; // handle invalid drop_task resp, TD-20098 From 34a62c3f3845ef8bd72b417039da7edfe0d28f49 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 5 Jul 2023 22:55:41 +0800 Subject: [PATCH 04/24] fix:vginfo assigment not in same time,we should get vginfo by vg,not all vg --- source/client/src/clientTmq.c | 14 ++++++++------ source/dnode/mnode/impl/src/mndSubscribe.c | 2 +- source/dnode/vnode/src/tq/tq.c | 3 ++- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index accaa8fa1d..62cb5d2bb2 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2565,15 +2565,15 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } bool needFetch = false; - + int32_t index = 0; for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); if (!pClientVg->receivedInfoFromVnode) { needFetch = true; - break; + continue; } - tmq_topic_assignment* pAssignment = &(*assignment)[j]; + tmq_topic_assignment* pAssignment = &(*assignment)[index++]; if (pClientVg->offsetInfo.currentOffset.type == TMQ_OFFSET__LOG) { pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version; } else { @@ -2603,7 +2603,9 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a terrno = TSDB_CODE_OUT_OF_MEMORY; for (int32_t i = 0; i < (*numOfAssignment); ++i) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - + if (pClientVg->receivedInfoFromVnode) { + continue; + } SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam)); if (pParam == NULL) { destroyCommonInfo(pCommon); @@ -2674,11 +2676,11 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } else { int32_t num = taosArrayGetSize(pCommon->pList); for(int32_t i = 0; i < num; ++i) { - (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i); + (*assignment)[index++] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i); tscInfo("consumer:0x%" PRIx64 " get assignment from server:%d->%" PRId64, tmq->consumerId, (*assignment)[i].vgId, (*assignment)[i].currentOffset); } - *numOfAssignment = num; + *numOfAssignment = index; } // for (int32_t j = 0; j < (*numOfAssignment); ++j) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 7ecd994b5a..48de21199b 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -275,7 +275,7 @@ static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pI taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosArrayPush(pOutput->newConsumers, &consumerId); - mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId); + mInfo("sub:%s mq rebalance add new consumer:0x%" PRIx64, pSubKey, consumerId); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0989b980ea..baba735d6c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -571,7 +571,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { dataRsp.rspOffset.type = TMQ_OFFSET__LOG; dataRsp.rspOffset.version = pOffset->val.version; - tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from stroe:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); + tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); } else { if (req.useSnapshot == true) { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey); @@ -593,6 +593,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { tDeleteMqDataRsp(&dataRsp); return -1; } + tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); } tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever); From d0d8d93c94f68bb1ac58271a38423140de622556 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 6 Jul 2023 01:29:49 +0000 Subject: [PATCH 05/24] add version check in rpc --- source/libs/transport/src/transCli.c | 2 +- source/libs/transport/src/transSvr.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e2d4983e57..227fc63680 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -391,7 +391,7 @@ void cliHandleResp(SCliConn* conn) { transMsg.info.ahandle = NULL; transMsg.info.traceId = pHead->traceId; transMsg.info.hasEpSet = pHead->hasEpSet; - transMsg.info.cliVer = pHead->compatibilityVer; + transMsg.info.cliVer = htonl(pHead->compatibilityVer); SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 0f165d04b2..f23e176c79 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -412,7 +412,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; pHead->magicNum = htonl(TRANS_MAGIC_NUM); - pHead->compatibilityVer = ((STrans*)pConn->pTransInst)->compatibilityVer; + pHead->compatibilityVer = htonl(((STrans*)pConn->pTransInst)->compatibilityVer); pHead->version = TRANS_VER; // handle invalid drop_task resp, TD-20098 From a630d1284ca79f33d3043fe7ea56ecd37dadc13f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 6 Jul 2023 02:03:31 +0000 Subject: [PATCH 06/24] add version check in rpc --- source/dnode/mgmt/test/sut/src/client.cpp | 2 ++ source/libs/transport/test/transUT.cpp | 8 ++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/test/sut/src/client.cpp b/source/dnode/mgmt/test/sut/src/client.cpp index a27a511651..95eea2359d 100644 --- a/source/dnode/mgmt/test/sut/src/client.cpp +++ b/source/dnode/mgmt/test/sut/src/client.cpp @@ -16,6 +16,7 @@ #include "sut.h" #include "tdatablock.h" #include "tmisce.h" +#include "tversion.h" static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) { TestClient* client = (TestClient*)parent; @@ -53,6 +54,7 @@ void TestClient::DoInit() { rpcInit.parent = this; // rpcInit.secret = (char*)secretEncrypt; // rpcInit.spi = 1; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); clientRpc = rpcOpen(&rpcInit); ASSERT(clientRpc); diff --git a/source/libs/transport/test/transUT.cpp b/source/libs/transport/test/transUT.cpp index 88a1e2564f..2fa94c358f 100644 --- a/source/libs/transport/test/transUT.cpp +++ b/source/libs/transport/test/transUT.cpp @@ -18,10 +18,10 @@ #include "tdatablock.h" #include "tglobal.h" #include "tlog.h" +#include "tmisce.h" #include "transLog.h" #include "trpc.h" -#include "tmisce.h" - +#include "tversion.h" using namespace std; const char *label = "APP"; @@ -54,6 +54,8 @@ class Client { rpcInit_.user = (char *)user; rpcInit_.parent = this; rpcInit_.connType = TAOS_CONN_CLIENT; + + taosVersionStrToInt(version, &(rpcInit_.compatibilityVer)); this->transCli = rpcOpen(&rpcInit_); tsem_init(&this->sem, 0, 0); } @@ -66,6 +68,7 @@ class Client { void Restart(CB cb) { rpcClose(this->transCli); rpcInit_.cfp = cb; + taosVersionStrToInt(version, &(rpcInit_.compatibilityVer)); this->transCli = rpcOpen(&rpcInit_); } void Stop() { @@ -117,6 +120,7 @@ class Server { rpcInit_.cfp = processReq; rpcInit_.user = (char *)user; rpcInit_.connType = TAOS_CONN_SERVER; + taosVersionStrToInt(version, &(rpcInit_.compatibilityVer)); } void Start() { this->transSrv = rpcOpen(&this->rpcInit_); From d061c54a180e4d9b239c606c0eeb63942c839b5c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 6 Jul 2023 06:41:06 +0000 Subject: [PATCH 07/24] add ver check --- source/libs/transport/inc/transportInt.h | 2 +- source/libs/transport/src/transCli.c | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 9ef2373b3f..ca48da690b 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -49,7 +49,7 @@ typedef struct { int32_t compatibilityVer; int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int8_t encryption; // encrypt or not - + int32_t retryMinInterval; // retry init interval int32_t retryStepFactor; // retry interval factor int32_t retryMaxInterval; // retry max interval diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 227fc63680..8062a0618b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -489,6 +489,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; transMsg.info.ahandle = NULL; + transMsg.info.cliVer = pTransInst->compatibilityVer; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); @@ -1350,6 +1351,7 @@ static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) { transMsg.info.ahandle = pMsg->ctx->ahandle; transMsg.info.traceId = pMsg->msg.info.traceId; transMsg.info.hasEpSet = false; + transMsg.info.cliVer = pTransInst->compatibilityVer; if (pCtx->pSem != NULL) { if (pCtx->pRsp == NULL) { } else { @@ -1531,6 +1533,9 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { // persist conn already release by server STransMsg resp; cliBuildExceptResp(pMsg, &resp); + // refactorr later + resp.info.cliVer = pTransInst->compatibilityVer; + if (pMsg->type != Release) { pTransInst->cfp(pTransInst->parent, &resp, NULL); } @@ -1840,6 +1845,7 @@ void cliIteraConnMsgs(SCliConn* conn) { if (-1 == cliBuildExceptResp(cmsg, &resp)) { continue; } + resp.info.cliVer = pTransInst->compatibilityVer; pTransInst->cfp(pTransInst->parent, &resp, NULL); cmsg->ctx->ahandle = NULL; From 5e156aa0a9b0eaf153ec2bf0ca207e498529a76b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 6 Jul 2023 06:43:07 +0000 Subject: [PATCH 08/24] add ver check --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index a7a7273bd0..5d6d16ccf8 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -77,7 +77,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { int32_t svrVer = 0; taosVersionStrToInt(version, &svrVer); if (0 != taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) { - dError("msg ver: %d, curr ver: %d", pRpc->info.cliVer, svrVer); + dError("Version not compatible, cli ver: %d, svr ver: %d", pRpc->info.cliVer, svrVer); goto _OVER; } From a1a2b8331f483f005e487440902d7d2136cc5214 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Mon, 3 Jul 2023 06:39:47 -0400 Subject: [PATCH 09/24] fix: ttl fill cache only in initialization --- source/dnode/vnode/src/inc/metaTtl.h | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/meta/metaCommit.c | 4 -- source/dnode/vnode/src/meta/metaOpen.c | 85 ++++++++++++------------ source/dnode/vnode/src/meta/metaTtl.c | 6 +- source/dnode/vnode/src/vnd/vnodeOpen.c | 9 ++- 6 files changed, 56 insertions(+), 51 deletions(-) diff --git a/source/dnode/vnode/src/inc/metaTtl.h b/source/dnode/vnode/src/inc/metaTtl.h index bf3b897c6f..428f4438b6 100644 --- a/source/dnode/vnode/src/inc/metaTtl.h +++ b/source/dnode/vnode/src/inc/metaTtl.h @@ -81,7 +81,7 @@ typedef struct { int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback); int ttlMgrClose(STtlManger* pTtlMgr); -int ttlMgrBegin(STtlManger* pTtlMgr, void* pMeta); +int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta); int ttlMgrConvert(TTB* pOldTtlIdx, TTB* pNewTtlIdx, void* pMeta); int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index a9541d8c47..54bbeaea88 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -136,6 +136,7 @@ typedef struct STbUidStore STbUidStore; #define META_BEGIN_HEAP_NIL 2 int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback); +int metaPostOpen(SVnode* pVnode, SMeta** ppMeta); // for operations depend on "meta txn" int metaClose(SMeta** pMeta); int metaBegin(SMeta* pMeta, int8_t fromSys); TXN* metaGetTxn(SMeta* pMeta); diff --git a/source/dnode/vnode/src/meta/metaCommit.c b/source/dnode/vnode/src/meta/metaCommit.c index 1fa5b9c1e9..d262567953 100644 --- a/source/dnode/vnode/src/meta/metaCommit.c +++ b/source/dnode/vnode/src/meta/metaCommit.c @@ -40,10 +40,6 @@ int metaBegin(SMeta *pMeta, int8_t heap) { return -1; } - if (ttlMgrBegin(pMeta->pTtlMgr, pMeta) < 0) { - return -1; - } - tdbCommit(pMeta->pEnv, pMeta->txn); return 0; diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index fb17aff318..7469ddfcc3 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -29,6 +29,8 @@ static int ncolIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); } static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); } +static void metaCleanup(SMeta **ppMeta); + int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { SMeta *pMeta = NULL; int ret; @@ -180,51 +182,26 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { return 0; _err: - if (pMeta->pIdx) metaCloseIdx(pMeta); - if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); - if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx); - if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx); - if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); - if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr); - if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); - if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx); - if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx); - if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx); - if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx); - if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx); - if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb); - if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb); - if (pMeta->pEnv) tdbClose(pMeta->pEnv); - metaDestroyLock(pMeta); - taosMemoryFree(pMeta); + metaCleanup(&pMeta); + return -1; +} + +int metaPostOpen(SVnode *pVnode, SMeta **ppMeta) { + SMeta *pMeta = *ppMeta; + if (ttlMgrPostOpen(pMeta->pTtlMgr, pMeta) < 0) { + metaError("vgId:%d, failed to post open meta ttl since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } + + return 0; + +_err: + metaCleanup(ppMeta); return -1; } int metaClose(SMeta **ppMeta) { - SMeta *pMeta = *ppMeta; - if (pMeta) { - if (pMeta->pEnv) metaAbort(pMeta); - if (pMeta->pCache) metaCacheClose(pMeta); - if (pMeta->pIdx) metaCloseIdx(pMeta); - if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); - if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx); - if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx); - if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); - if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr); - if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); - if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx); - if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx); - if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx); - if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx); - if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx); - if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb); - if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb); - if (pMeta->pEnv) tdbClose(pMeta->pEnv); - metaDestroyLock(pMeta); - - taosMemoryFreeClear(*ppMeta); - } - + metaCleanup(ppMeta); return 0; } @@ -270,6 +247,32 @@ int32_t metaULock(SMeta *pMeta) { return ret; } +static void metaCleanup(SMeta **ppMeta) { + SMeta *pMeta = *ppMeta; + if (pMeta) { + if (pMeta->pEnv) metaAbort(pMeta); + if (pMeta->pCache) metaCacheClose(pMeta); + if (pMeta->pIdx) metaCloseIdx(pMeta); + if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); + if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx); + if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx); + if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); + if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr); + if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); + if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx); + if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx); + if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx); + if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx); + if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx); + if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb); + if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb); + if (pMeta->pEnv) tdbClose(pMeta->pEnv); + metaDestroyLock(pMeta); + + taosMemoryFreeClear(*ppMeta); + } +} + static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) { STbDbKey *pTbDbKey1 = (STbDbKey *)pKey1; STbDbKey *pTbDbKey2 = (STbDbKey *)pKey2; diff --git a/source/dnode/vnode/src/meta/metaTtl.c b/source/dnode/vnode/src/meta/metaTtl.c index af4827a9c7..7aecf1f203 100644 --- a/source/dnode/vnode/src/meta/metaTtl.c +++ b/source/dnode/vnode/src/meta/metaTtl.c @@ -79,8 +79,8 @@ int ttlMgrClose(STtlManger *pTtlMgr) { return 0; } -int ttlMgrBegin(STtlManger *pTtlMgr, void *pMeta) { - metaInfo("ttl mgr start open"); +int ttlMgrPostOpen(STtlManger *pTtlMgr, void *pMeta) { + metaInfo("ttl mgr start post open"); int ret; int64_t startNs = taosGetTimestampNs(); @@ -112,7 +112,7 @@ int ttlMgrBegin(STtlManger *pTtlMgr, void *pMeta) { int64_t endNs = taosGetTimestampNs(); - metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache), + metaInfo("ttl mgr post open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs); _out: return ret; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 583df15533..22750af1c7 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -76,7 +76,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p } SSyncCfg *pCfg = &info.config.syncCfg; - + pCfg->replicaNum = 0; pCfg->totalReplicaNum = 0; memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo)); @@ -109,7 +109,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex; } - vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d", + vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d", pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex); info.config.syncCfg = *pCfg; @@ -416,6 +416,11 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { goto _err; } + if (metaPostOpen(pVnode, &pVnode->pMeta) < 0) { + vError("vgId:%d, failed to post open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } + // open sync if (vnodeSyncOpen(pVnode, dir)) { vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); From a31e054146e0fd7962d0127e418a9e25bacaeba6 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Wed, 5 Jul 2023 15:58:09 +0800 Subject: [PATCH 10/24] fix: ttlmgr convert in metaUpgrade --- source/dnode/vnode/src/inc/metaTtl.h | 12 +-- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/meta/metaOpen.c | 29 +++++-- source/dnode/vnode/src/meta/metaTtl.c | 103 +++++++++++++++---------- source/dnode/vnode/src/vnd/vnodeOpen.c | 9 +-- 5 files changed, 96 insertions(+), 59 deletions(-) diff --git a/source/dnode/vnode/src/inc/metaTtl.h b/source/dnode/vnode/src/inc/metaTtl.h index 428f4438b6..a3d3ceab24 100644 --- a/source/dnode/vnode/src/inc/metaTtl.h +++ b/source/dnode/vnode/src/inc/metaTtl.h @@ -79,16 +79,18 @@ typedef struct { TXN* pTxn; } STtlDelTtlCtx; -int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback); -int ttlMgrClose(STtlManger* pTtlMgr); -int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta); +int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback); +void ttlMgrClose(STtlManger* pTtlMgr); +int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta); -int ttlMgrConvert(TTB* pOldTtlIdx, TTB* pNewTtlIdx, void* pMeta); -int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn); +bool ttlMgrNeedUpgrade(TDB* pEnv); +int ttlMgrUpgrade(STtlManger* pTtlMgr, void* pMeta); int ttlMgrInsertTtl(STtlManger* pTtlMgr, const STtlUpdTtlCtx* pUpdCtx); int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx); int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx); + +int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn); int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 54bbeaea88..cbf0933358 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -136,7 +136,7 @@ typedef struct STbUidStore STbUidStore; #define META_BEGIN_HEAP_NIL 2 int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback); -int metaPostOpen(SVnode* pVnode, SMeta** ppMeta); // for operations depend on "meta txn" +int metaUpgrade(SVnode* pVnode, SMeta** ppMeta); int metaClose(SMeta** pMeta); int metaBegin(SMeta* pMeta, int8_t fromSys); TXN* metaGetTxn(SMeta* pMeta); diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index 7469ddfcc3..511cc8d6ec 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -186,18 +186,35 @@ _err: return -1; } -int metaPostOpen(SVnode *pVnode, SMeta **ppMeta) { +int metaUpgrade(SVnode *pVnode, SMeta **ppMeta) { + int code = TSDB_CODE_SUCCESS; SMeta *pMeta = *ppMeta; - if (ttlMgrPostOpen(pMeta->pTtlMgr, pMeta) < 0) { - metaError("vgId:%d, failed to post open meta ttl since %s", TD_VID(pVnode), tstrerror(terrno)); - goto _err; + + if (ttlMgrNeedUpgrade(pMeta->pEnv)) { + code = metaBegin(pMeta, META_BEGIN_HEAP_OS); + if (code < 0) { + metaError("vgId:%d, failed to upgrade meta, meta begin failed since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } + + code = ttlMgrUpgrade(pMeta->pTtlMgr, pMeta); + if (code < 0) { + metaError("vgId:%d, failed to upgrade meta ttl since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } + + code = metaCommit(pMeta, pMeta->txn); + if (code < 0) { + metaError("vgId:%d, failed to upgrade meta ttl, meta commit failed since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } } - return 0; + return TSDB_CODE_SUCCESS; _err: metaCleanup(ppMeta); - return -1; + return code; } int metaClose(SMeta **ppMeta) { diff --git a/source/dnode/vnode/src/meta/metaTtl.c b/source/dnode/vnode/src/meta/metaTtl.c index 7aecf1f203..c6cb826149 100644 --- a/source/dnode/vnode/src/meta/metaTtl.c +++ b/source/dnode/vnode/src/meta/metaTtl.c @@ -21,6 +21,10 @@ typedef struct { SMeta *pMeta; } SConvertData; +static void ttlMgrCleanup(STtlManger *pTtlMgr); + +static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta); + static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid); static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); @@ -36,27 +40,17 @@ const char *ttlTbname = "ttl.idx"; const char *ttlV1Tbname = "ttlv1.idx"; int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) { - int ret; + int ret = TSDB_CODE_SUCCESS; + int64_t startNs = taosGetTimestampNs(); *ppTtlMgr = NULL; STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr)); - if (pTtlMgr == NULL) { - return -1; - } - - if (tdbTbExist(ttlTbname, pEnv)) { - ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pEnv, &pTtlMgr->pOldTtlIdx, rollback); - if (ret < 0) { - metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno)); - return ret; - } - } + if (pTtlMgr == NULL) return TSDB_CODE_OUT_OF_MEMORY; ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback); if (ret < 0) { metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno)); - tdbOsFree(pTtlMgr); return ret; } @@ -66,42 +60,57 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) { taosThreadRwlockInit(&pTtlMgr->lock, NULL); + ret = ttlMgrFillCache(pTtlMgr); + if (ret < 0) { + metaError("failed to fill hash since %s", tstrerror(terrno)); + ttlMgrCleanup(pTtlMgr); + return ret; + } + + int64_t endNs = taosGetTimestampNs(); + metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache), + endNs - startNs); + *ppTtlMgr = pTtlMgr; - return 0; + return TSDB_CODE_SUCCESS; } -int ttlMgrClose(STtlManger *pTtlMgr) { - taosHashCleanup(pTtlMgr->pTtlCache); - taosHashCleanup(pTtlMgr->pDirtyUids); - tdbTbClose(pTtlMgr->pTtlIdx); - taosThreadRwlockDestroy(&pTtlMgr->lock); - tdbOsFree(pTtlMgr); - return 0; +void ttlMgrClose(STtlManger *pTtlMgr) { ttlMgrCleanup(pTtlMgr); } + +bool ttlMgrNeedUpgrade(TDB *pEnv) { + bool needUpgrade = tdbTbExist(ttlTbname, pEnv); + if (needUpgrade) { + metaInfo("find ttl idx in old version , will convert"); + } + return needUpgrade; } -int ttlMgrPostOpen(STtlManger *pTtlMgr, void *pMeta) { - metaInfo("ttl mgr start post open"); - int ret; +int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) { + SMeta *meta = (SMeta *)pMeta; + int ret = TSDB_CODE_SUCCESS; + + if (!tdbTbExist(ttlTbname, meta->pEnv)) return TSDB_CODE_SUCCESS; + + metaInfo("ttl mgr start upgrade"); int64_t startNs = taosGetTimestampNs(); - SMeta *meta = (SMeta *)pMeta; + ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0); + if (ret < 0) { + metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno)); + goto _out; + } - if (pTtlMgr->pOldTtlIdx) { - ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta); - if (ret < 0) { - metaError("failed to convert ttl index since %s", tstrerror(terrno)); - goto _out; - } + ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta); + if (ret < 0) { + metaError("failed to convert ttl index since %s", tstrerror(terrno)); + goto _out; + } - ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn); - if (ret < 0) { - metaError("failed to drop old ttl index since %s", tstrerror(terrno)); - goto _out; - } - - tdbTbClose(pTtlMgr->pOldTtlIdx); - pTtlMgr->pOldTtlIdx = NULL; + ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn); + if (ret < 0) { + metaError("failed to drop old ttl index since %s", tstrerror(terrno)); + goto _out; } ret = ttlMgrFillCache(pTtlMgr); @@ -111,13 +120,23 @@ int ttlMgrPostOpen(STtlManger *pTtlMgr, void *pMeta) { } int64_t endNs = taosGetTimestampNs(); - - metaInfo("ttl mgr post open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache), + metaInfo("ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs); _out: + tdbTbClose(pTtlMgr->pOldTtlIdx); + pTtlMgr->pOldTtlIdx = NULL; + return ret; } +static void ttlMgrCleanup(STtlManger *pTtlMgr) { + taosHashCleanup(pTtlMgr->pTtlCache); + taosHashCleanup(pTtlMgr->pDirtyUids); + tdbTbClose(pTtlMgr->pTtlIdx); + taosThreadRwlockDestroy(&pTtlMgr->lock); + tdbOsFree(pTtlMgr); +} + static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) { if (ttlDays <= 0) return; @@ -205,7 +224,7 @@ _out: return ret; } -int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) { +static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) { SMeta *meta = pMeta; metaInfo("ttlMgr convert ttl start."); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 22750af1c7..c794c7ebd6 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -371,6 +371,10 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { goto _err; } + if (metaUpgrade(pVnode, &pVnode->pMeta) < 0) { + vError("vgId:%d, failed to upgrade meta since %s", TD_VID(pVnode), tstrerror(terrno)); + } + // open tsdb if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) { vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); @@ -416,11 +420,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { goto _err; } - if (metaPostOpen(pVnode, &pVnode->pMeta) < 0) { - vError("vgId:%d, failed to post open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno)); - goto _err; - } - // open sync if (vnodeSyncOpen(pVnode, dir)) { vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); From 196f2b8f65f8676ae8d96125c24a6f996edb42b1 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Thu, 6 Jul 2023 16:45:53 +0800 Subject: [PATCH 11/24] test:comment connection of new client and old server --- tests/system-test/0-others/compatibility.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/system-test/0-others/compatibility.py b/tests/system-test/0-others/compatibility.py index 22e319fdaf..b8623c3bcd 100644 --- a/tests/system-test/0-others/compatibility.py +++ b/tests/system-test/0-others/compatibility.py @@ -151,9 +151,10 @@ class TDTestCase: os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '") os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql") - cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;" - if os.system(cmd) == 0: - raise Exception("failed to execute system command. cmd: %s" % cmd) + # cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;" + # tdLog.info(f"new client version connect to old version taosd, commad return value:{cmd}") + # if os.system(cmd) == 0: + # raise Exception("failed to execute system command. cmd: %s" % cmd) os.system("pkill taosd") # make sure all the data are saved in disk. self.checkProcessPid("taosd") From 9e6e59adc2b27eb0f8564ccdfa82bbf3858ac5fb Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 6 Jul 2023 16:47:48 +0800 Subject: [PATCH 12/24] fix:race condition for pTq->pStore->pHash --- source/dnode/vnode/src/tq/tqOffset.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 0a9905b544..11bb737225 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -104,7 +104,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { pStore->needCommit = 0; pTq->pOffsetStore = pStore; - pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); + pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); if (pStore->pHash == NULL) { taosMemoryFree(pStore); return NULL; From 09222801c490e1570919d42bc0a6b2a6ff015b50 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 6 Jul 2023 17:52:48 +0800 Subject: [PATCH 13/24] fix(coverity): fix coverity scan issues --- source/common/src/trow.c | 2 +- source/dnode/vnode/src/meta/metaTable.c | 7 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 115 +++++++----------------- source/libs/tdb/src/db/tdbBtree.c | 10 +-- source/libs/tdb/src/db/tdbDb.c | 5 +- source/libs/tdb/src/db/tdbPager.c | 1 + 6 files changed, 48 insertions(+), 92 deletions(-) diff --git a/source/common/src/trow.c b/source/common/src/trow.c index 8ae77bcd0a..039f436505 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -423,7 +423,7 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) { val = (const void *)&pColVal->value.val; } } else { - pColVal = NULL; + // pColVal = NULL; valType = TD_VTYPE_NONE; } diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index eb2d2e267b..32b63fa950 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -207,7 +207,10 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { tb_uid_t uid = *(tb_uid_t *)pData; tdbFree(pData); SMetaInfo info; - metaGetInfo(pMeta, uid, &info, NULL); + if (metaGetInfo(pMeta, uid, &info, NULL) == TSDB_CODE_NOT_FOUND) { + terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; + return -1; + } if (info.uid == info.suid) { return 0; } else { @@ -939,7 +942,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) { return 0; } - metaInfo("ttl find expired table count: %zu" , TARRAY_SIZE(tbUids)); + metaInfo("ttl find expired table count: %zu", TARRAY_SIZE(tbUids)); metaDropTables(pMeta, tbUids); return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 31b13b8411..f4e888919e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1031,7 +1031,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache return code; } - +/* int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { int32_t code = 0; SLRUCache *pCache = pTsdb->lruCache; @@ -1079,7 +1079,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR return code; } - +*/ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t code = 0; // fetch schema @@ -1829,10 +1829,11 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa } *pIgnoreEarlierTs = false; + /* if (!hasVal) { state->state = SFSLASTNEXTROW_FILESET; } - + */ if (!state->checkRemainingRow) { state->checkRemainingRow = true; } @@ -2020,10 +2021,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk); if (block.maxKey.ts <= state->lastTs) { *pIgnoreEarlierTs = true; - if (state->pBlockData) { - tBlockDataDestroy(state->pBlockData); - state->pBlockData = NULL; - } + + tBlockDataDestroy(state->pBlockData); + state->pBlockData = NULL; *ppRow = NULL; return code; @@ -3176,97 +3176,46 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, TSKEY rowTs = TSDBROW_TS(pRow); - if (lastRowTs == TSKEY_MAX) { - lastRowTs = rowTs; + lastRowTs = rowTs; - for (int16_t iCol = noneCol; iCol < nCols; ++iCol) { - if (iCol >= nLastCol) { - break; - } - SLastCol *pCol = taosArrayGet(pColArray, iCol); - if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) { - continue; - } - if (slotIds[iCol] == 0) { - STColumn *pTColumn = &pTSchema->columns[0]; - - *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = rowTs}); - taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal}); - continue; - } - tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); - - *pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal}; - if (IS_VAR_DATA_TYPE(pColVal->type) /*&& pColVal->value.nData > 0*/) { - pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); - if (pCol->colVal.value.pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - if (pColVal->value.nData > 0) { - memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); - } - } - - /*if (COL_VAL_IS_NONE(pColVal)) { - if (!setNoneCol) { - noneCol = iCol; - setNoneCol = true; - } - } else {*/ - int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ); - if (aColIndex >= 0) { - taosArrayRemove(aColArray, aColIndex); - } - //} - } - if (!setNoneCol) { - // done, goto return pColArray - break; - } else { - continue; - } - } - - // merge into pColArray - setNoneCol = false; for (int16_t iCol = noneCol; iCol < nCols; ++iCol) { if (iCol >= nLastCol) { break; } - // high version's column value - SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol); - if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) { + SLastCol *pCol = taosArrayGet(pColArray, iCol); + if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) { continue; } - SColVal *tColVal = &lastColVal->colVal; + if (slotIds[iCol] == 0) { + STColumn *pTColumn = &pTSchema->columns[0]; + *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = rowTs}); + taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal}); + continue; + } tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); - if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { - SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal}; - if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { - SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); - taosMemoryFree(pLastCol->colVal.value.pData); - lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); - if (lastCol.colVal.value.pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + *pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal}; + if (IS_VAR_DATA_TYPE(pColVal->type) /*&& pColVal->value.nData > 0*/) { + pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); + if (pCol->colVal.value.pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; } + if (pColVal->value.nData > 0) { + memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } + } - taosArraySet(pColArray, iCol, &lastCol); - int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ); + int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ); + if (aColIndex >= 0) { taosArrayRemove(aColArray, aColIndex); - } else if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal) && !setNoneCol) { - noneCol = iCol; - setNoneCol = true; } } - } while (setNoneCol); + + break; + } while (1); if (!hasRow) { if (ignoreEarlierTs) { diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index c49b5726b6..db05f3e287 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -345,7 +345,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL } *ppKey = pTKey; *pkLen = cd.kLen; - memcpy(*ppKey, cd.pKey, cd.kLen); + memcpy(*ppKey, cd.pKey, (size_t)cd.kLen); } if (ppVal) { @@ -357,7 +357,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL } *ppVal = pTVal; *vLen = cd.vLen; - memcpy(*ppVal, cd.pVal, cd.vLen); + memcpy(*ppVal, cd.pVal, (size_t)cd.vLen); } if (TDB_CELLDECODER_FREE_KEY(&cd)) { @@ -1793,7 +1793,7 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { *ppKey = pKey; *kLen = cd.kLen; - memcpy(pKey, cd.pKey, cd.kLen); + memcpy(pKey, cd.pKey, (size_t)cd.kLen); if (ppVal) { if (cd.vLen > 0) { @@ -1852,7 +1852,7 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { *ppKey = pKey; *kLen = cd.kLen; - memcpy(pKey, cd.pKey, cd.kLen); + memcpy(pKey, cd.pKey, (size_t)cd.kLen); if (ppVal) { // TODO: vLen may be zero @@ -1864,7 +1864,7 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { *ppVal = pVal; *vLen = cd.vLen; - memcpy(pVal, cd.pVal, cd.vLen); + memcpy(pVal, cd.pVal, (size_t)cd.vLen); } ret = tdbBtcMoveToPrev(pBtc); diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index 952c49db73..0e9f192226 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -62,7 +62,10 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, i } memset(pDb->pgrHash, 0, tsize); - taosMulModeMkDir(dbname, 0755); + ret = taosMulModeMkDir(dbname, 0755); + if (ret < 0) { + return -1; + } #ifdef USE_MAINDB // open main db diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 5ea9be63db..5144a94a25 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -980,6 +980,7 @@ int tdbPagerRestoreJournals(SPager *pPager) { jname[dirLen] = '/'; sprintf(jname + dirLen + 1, TDB_MAINDB_NAME "-journal.%" PRId64, *pTxnId); if (tdbPagerRestore(pPager, jname) < 0) { + taosArrayDestroy(pTxnList); tdbCloseDir(&pDir); tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), jname); From 3bd99776d28bf7d537a44aa2453ffab6ccda88a5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 6 Jul 2023 18:03:15 +0800 Subject: [PATCH 14/24] add rpc version check --- source/libs/transport/inc/transComm.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index f5570f6afd..3b304e2c77 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -167,8 +167,8 @@ typedef struct { uint64_t timestamp; char user[TSDB_UNI_LEN]; + int32_t compatibilityVer; uint32_t magicNum; - uint32_t compatibilityVer; STraceId traceId; uint64_t ahandle; // ahandle assigned by client uint32_t code; // del later From 5964e7819c548faf9382b12e23de8142d32ac6fd Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 6 Jul 2023 19:38:37 +0800 Subject: [PATCH 15/24] test: add check --- tests/system-test/7-tmq/tmqDropConsumer.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/system-test/7-tmq/tmqDropConsumer.py b/tests/system-test/7-tmq/tmqDropConsumer.py index 336285a39e..06ce4c0fd7 100644 --- a/tests/system-test/7-tmq/tmqDropConsumer.py +++ b/tests/system-test/7-tmq/tmqDropConsumer.py @@ -256,6 +256,7 @@ class TDTestCase: tdLog.info("all consumers status into 'lost'") # drop consumer groups + tdLog.info("drop all consumers") for i in range(len(groupIdList)): for j in range(len(topicNameList)): sqlCmd = f"drop consumer group `%s` on %s"%(groupIdList[i], topicNameList[j]) @@ -265,7 +266,17 @@ class TDTestCase: tmqCom.g_end_insert_flag = 1 tdLog.debug("notify sub-thread to stop insert data") pThread.join() + + tdSql.query('show consumers;') + consumerNUm = tdSql.queryRows + tdSql.query('show subscriptions;') + subscribeNum = tdSql.queryRows + + if (0 != consumerNUm or 0 != subscribeNum): + tdLog.exit("drop consumer fail! consumerNUm %d, subscribeNum: %d"%(consumerNUm, subscribeNum)) + + tdLog.info("drop consuer success, there is no consumers and subscribes") tdLog.printNoPrefix("======== test case 1 end ...... ") def run(self): From e7ced3e9ccf216fd05611fa38954dcb370d2248a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 7 Jul 2023 09:04:44 +0800 Subject: [PATCH 16/24] fix: desc table without permission to view any column issue --- source/libs/command/src/command.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index f59653700b..b004539e16 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -87,7 +87,7 @@ static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) { return code; } -static void setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock, int32_t numOfRows, STableMeta* pMeta) { +static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock, int32_t numOfRows, STableMeta* pMeta) { blockDataEnsureCapacity(pBlock, numOfRows); pBlock->info.rows = 0; @@ -114,6 +114,11 @@ static void setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock, in colDataSetVal(pCol4, pBlock->info.rows, buf, false); ++(pBlock->info.rows); } + if (pBlock->info.rows <= 0) { + qError("no permission to view any columns"); + return TSDB_CODE_PAR_PERMISSION_DENIED; + } + return TSDB_CODE_SUCCESS; } static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp** pRsp) { @@ -123,7 +128,7 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp** SSDataBlock* pBlock = NULL; int32_t code = buildDescResultDataBlock(&pBlock); if (TSDB_CODE_SUCCESS == code) { - setDescResultIntoDataBlock(sysInfoUser, pBlock, numOfRows, pDesc->pMeta); + code = setDescResultIntoDataBlock(sysInfoUser, pBlock, numOfRows, pDesc->pMeta); } if (TSDB_CODE_SUCCESS == code) { code = buildRetrieveTableRsp(pBlock, DESCRIBE_RESULT_COLS, pRsp); From 747b80cfd747a032865d8fe170fa11087af0f96d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 7 Jul 2023 09:25:12 +0800 Subject: [PATCH 17/24] add test check --- tests/system-test/0-others/compatibility.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/system-test/0-others/compatibility.py b/tests/system-test/0-others/compatibility.py index b8623c3bcd..3fcff9b3de 100644 --- a/tests/system-test/0-others/compatibility.py +++ b/tests/system-test/0-others/compatibility.py @@ -151,10 +151,10 @@ class TDTestCase: os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '") os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql") - # cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;" - # tdLog.info(f"new client version connect to old version taosd, commad return value:{cmd}") - # if os.system(cmd) == 0: - # raise Exception("failed to execute system command. cmd: %s" % cmd) + cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;" + tdLog.info(f"new client version connect to old version taosd, commad return value:{cmd}") + if os.system(cmd) == 0: + raise Exception("failed to execute system command. cmd: %s" % cmd) os.system("pkill taosd") # make sure all the data are saved in disk. self.checkProcessPid("taosd") From 9c38e7c1b17fbcfde4d99f731cccfcdb77443d89 Mon Sep 17 00:00:00 2001 From: huolibo Date: Fri, 7 Jul 2023 09:43:36 +0800 Subject: [PATCH 18/24] docs(driver): jdbc 3.2.4 description --- docs/en/07-develop/07-tmq.mdx | 4 ---- docs/en/14-reference/03-connector/04-java.mdx | 7 ++++--- docs/zh/07-develop/07-tmq.mdx | 4 ---- docs/zh/08-connector/14-java.mdx | 1 + 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx index f5e0378a00..506a8dcc46 100644 --- a/docs/en/07-develop/07-tmq.mdx +++ b/docs/en/07-develop/07-tmq.mdx @@ -81,10 +81,6 @@ Set subscription() throws SQLException; ConsumerRecords poll(Duration timeout) throws SQLException; -void commitAsync(); - -void commitAsync(OffsetCommitCallback callback); - void commitSync() throws SQLException; void close() throws SQLException; diff --git a/docs/en/14-reference/03-connector/04-java.mdx b/docs/en/14-reference/03-connector/04-java.mdx index e8c407b125..b68aeda94c 100644 --- a/docs/en/14-reference/03-connector/04-java.mdx +++ b/docs/en/14-reference/03-connector/04-java.mdx @@ -36,15 +36,16 @@ REST connection supports all platforms that can run Java. | taos-jdbcdriver version | major changes | TDengine version | | :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: | +| 3.2.4 | Subscription add the enable.auto.commit parameter and the unsubscribe() method in the WebSocket connection | 3.0.5.0 or later | | 3.2.3 | Fixed resultSet data parsing failure in some cases | 3.0.5.0 or later | -| 3.2.2 | subscription add seek function | 3.0.5.0 or later | +| 3.2.2 | Subscription add seek function | 3.0.5.0 or later | | 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | 3.0.3.0 or later | | 3.2.0 | This version has been deprecated | - | | 3.1.0 | JDBC REST connection supports subscription over WebSocket | - | | 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment | - | | 3.0.0 | Support for TDengine 3.0 | 3.0.0.0 or later | -| 2.0.42 | fix wasNull interface return value in WebSocket connection | - | -| 2.0.41 | fix decode method of username and password in REST connection | - | +| 2.0.42 | Fix wasNull interface return value in WebSocket connection | - | +| 2.0.41 | Fix decode method of username and password in REST connection | - | | 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters | - | | 2.0.38 | JDBC REST connections add bulk pull function | - | | 2.0.37 | Support json tags | - | diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index 54a8af2287..38b91d7cea 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -81,10 +81,6 @@ Set subscription() throws SQLException; ConsumerRecords poll(Duration timeout) throws SQLException; -void commitAsync(); - -void commitAsync(OffsetCommitCallback callback); - void commitSync() throws SQLException; void close() throws SQLException; diff --git a/docs/zh/08-connector/14-java.mdx b/docs/zh/08-connector/14-java.mdx index c7da2bd4f5..96f8991eea 100644 --- a/docs/zh/08-connector/14-java.mdx +++ b/docs/zh/08-connector/14-java.mdx @@ -36,6 +36,7 @@ REST 连接支持所有能运行 Java 的平台。 | taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 | | :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: | +| 3.2.4 | 数据订阅在 WebSocket 连接下增加 enable.auto.commit 参数,以及 unsubscribe() 方法。 | - | | 3.2.3 | 修复 ResultSet 在一些情况数据解析失败 | - | | 3.2.2 | 新增功能:数据订阅支持 seek 功能。 | 3.0.5.0 及更高版本 | | 3.2.1 | 新增功能:WebSocket 连接支持 schemaless 与 prepareStatement 写入。变更:consumer poll 返回结果集为 ConsumerRecord,可通过 value() 获取指定结果集数据。 | 3.0.3.0 及更高版本 | From a99fac031cfc4d6485f40cfa7ab4c6c2eb1ea79c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 7 Jul 2023 10:25:48 +0800 Subject: [PATCH 19/24] fix: fix coverity scan issues --- source/client/src/clientHb.c | 6 +----- source/client/src/clientRawBlockWrite.c | 4 ++++ source/libs/catalog/src/catalog.c | 5 +---- source/libs/catalog/src/ctgCache.c | 4 +--- source/libs/catalog/src/ctgUtil.c | 1 - source/libs/parser/src/parTranslater.c | 5 ++++- source/libs/scheduler/src/schJob.c | 1 + source/libs/scheduler/src/schRemote.c | 1 + source/libs/scheduler/src/schTask.c | 1 - 9 files changed, 13 insertions(+), 15 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 04cb1821b0..54e3a6ee48 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -137,7 +137,6 @@ static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) { vgInfo->hashSuffix = rsp->hashSuffix; vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (NULL == vgInfo->vgHash) { - taosMemoryFree(vgInfo); tscError("hash init[%d] failed", rsp->vgNum); code = TSDB_CODE_OUT_OF_MEMORY; goto _return; @@ -147,8 +146,6 @@ static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) { SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j); if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) { tscError("hash push failed, errno:%d", errno); - taosHashCleanup(vgInfo->vgHash); - taosMemoryFree(vgInfo); code = TSDB_CODE_OUT_OF_MEMORY; goto _return; } @@ -486,7 +483,6 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { if (code) { taosArrayDestroy(desc.subDesc); desc.subDesc = NULL; - desc.subPlanNum = 0; } desc.subPlanNum = taosArrayGetSize(desc.subDesc); } else { @@ -592,7 +588,7 @@ static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClient code = TSDB_CODE_OUT_OF_MEMORY; goto _return; } - strncpy(user->user, pTscObj->user, TSDB_USER_LEN); + tstrncpy(user->user, pTscObj->user, TSDB_USER_LEN); user->version = htonl(-1); // force get userAuthInfo kv.valueLen = sizeof(SUserAuthVersion); kv.value = user; diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index cfc8ae9186..90b10e0920 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1286,6 +1286,10 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { taosArrayPush(pArray, &pVgData); pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); + if (NULL == pQuery) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->msgType = TDMT_VND_ALTER_TABLE; pQuery->stableQuery = false; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index f736e9be98..f975517669 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -341,13 +341,10 @@ int32_t ctgChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, SUserAuthInfo *pReq, SCtgAuthReq req = {0}; req.pRawReq = pReq; req.pConn = pConn; - req.onlyCache = exists ? true : false; + req.onlyCache = false; CTG_ERR_RET(ctgGetUserDbAuthFromMnode(pCtg, pConn, pReq->user, &req.authInfo, NULL)); CTG_ERR_JRET(ctgChkSetAuthRes(pCtg, &req, &rsp)); - if (rsp.metaNotExists && exists) { - *exists = false; - } _return: diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index c856211635..605f5efeb4 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -1721,9 +1721,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName, meta->tableType); - if (pCache) { - CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache)); - } + CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache)); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index e7abbc5ead..86f6a51d9b 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -926,7 +926,6 @@ int32_t ctgGenerateVgList(SCatalog* pCtg, SHashObj* vgHash, SArray** pList) { } pIter = taosHashIterate(vgHash, pIter); - vgInfo = NULL; } *pList = vgList; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 8fc4be5f95..c318e25fd9 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6044,6 +6044,9 @@ static int32_t checkCollectTopicTags(STranslateContext* pCxt, SCreateTopicStmt* // for (int32_t i = 0; i < pMeta->tableInfo.numOfColumns; ++i) { SSchema* column = &pMeta->schema[0]; SColumnNode* col = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == col) { + return TSDB_CODE_OUT_OF_MEMORY; + } strcpy(col->colName, column->name); strcpy(col->node.aliasName, col->colName); strcpy(col->node.userAlias, col->colName); @@ -6154,7 +6157,7 @@ static int32_t translateAlterLocal(STranslateContext* pCxt, SAlterLocalStmt* pSt char* p = strchr(pStmt->config, ' '); if (NULL != p) { *p = 0; - strcpy(pStmt->value, p + 1); + tstrncpy(pStmt->value, p + 1, sizeof(pStmt->value)); } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index e7bfe95795..78e0807775 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -135,6 +135,7 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { break; case JOB_TASK_STATUS_DROP: SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); + break; default: SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus)); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 80fdc7594c..01b4e7e9e6 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -392,6 +392,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD // NEVER REACH HERE SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:0x%" PRIx64, pJob->refId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + break; } case TDMT_SCH_LINK_BROKEN: SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode)); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 78e28bce49..adb0c111cf 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -961,7 +961,6 @@ int32_t schHandleExplainRes(SArray *pExplainRes) { localRsp->rsp.numOfPlans = 0; localRsp->rsp.subplanInfo = NULL; pTask = NULL; - pJob = NULL; } _return: From 4bebd5c0dab10e93e6793142768da37bdc4c8b29 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 7 Jul 2023 17:19:06 +0800 Subject: [PATCH 20/24] docs:add info for INS_SUBSCRIPTIONS --- docs/en/12-taos-sql/22-meta.md | 2 ++ docs/zh/12-taos-sql/22-meta.md | 2 ++ 2 files changed, 4 insertions(+) diff --git a/docs/en/12-taos-sql/22-meta.md b/docs/en/12-taos-sql/22-meta.md index 4123bdfb58..f165470d10 100644 --- a/docs/en/12-taos-sql/22-meta.md +++ b/docs/en/12-taos-sql/22-meta.md @@ -283,6 +283,8 @@ Provides dnode configuration information. | 2 | consumer_group | BINARY(193) | Subscribed consumer group | | 3 | vgroup_id | INT | Vgroup ID for the consumer | | 4 | consumer_id | BIGINT | Consumer ID | +| 5 | offset | BINARY(64) | Consumption progress | +| 6 | rows | BIGINT | Number of consumption items | ## INS_STREAMS diff --git a/docs/zh/12-taos-sql/22-meta.md b/docs/zh/12-taos-sql/22-meta.md index 3fffbd0706..fe8d6d4c69 100644 --- a/docs/zh/12-taos-sql/22-meta.md +++ b/docs/zh/12-taos-sql/22-meta.md @@ -284,6 +284,8 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | 2 | consumer_group | BINARY(193) | 订阅者的消费者组 | | 3 | vgroup_id | INT | 消费者被分配的 vgroup id | | 4 | consumer_id | BIGINT | 消费者的唯一 id | +| 5 | offset | BINARY(64) | 消费者的消费进度 | +| 6 | rows | BIGINT | 消费者的消费的数据条数 | ## INS_STREAMS From bcd1b0894f3aaf20dff733cf6d73a7b9b3bf5ce3 Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Fri, 7 Jul 2023 18:41:21 +0800 Subject: [PATCH 21/24] update package test script --- packaging/checkPackageRuning.py | 4 ++-- packaging/testpackage.sh | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packaging/checkPackageRuning.py b/packaging/checkPackageRuning.py index 96e2378fb3..914ee83f29 100755 --- a/packaging/checkPackageRuning.py +++ b/packaging/checkPackageRuning.py @@ -87,7 +87,7 @@ os.system("rm -rf /tmp/dumpdata/*") # dump data out print("taosdump dump out data") -os.system("taosdump -o /tmp/dumpdata -D test -y -h %s "%serverHost) +os.system("taosdump -o /tmp/dumpdata -D test -h %s "%serverHost) # drop database of test print("drop database test") @@ -95,7 +95,7 @@ os.system(" taos -s ' drop database test ;' -h %s "%serverHost) # dump data in print("taosdump dump data in") -os.system("taosdump -i /tmp/dumpdata -y -h %s "%serverHost) +os.system("taosdump -i /tmp/dumpdata -h %s "%serverHost) result = conn.query("SELECT count(*) from test.meters") diff --git a/packaging/testpackage.sh b/packaging/testpackage.sh index 081383f89b..0622b01f2b 100755 --- a/packaging/testpackage.sh +++ b/packaging/testpackage.sh @@ -152,7 +152,7 @@ function wgetFile { file=$1 versionPath=$2 sourceP=$3 -nasServerIP="192.168.1.131" +nasServerIP="192.168.1.213" packagePath="/nas/TDengine/v${versionPath}/${verMode}" if [ -f ${file} ];then echoColor YD "${file} already exists ,it will delete it and download it again " From 4436eb7e0f639c12b6619ccbe9b23da94cd6e55a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 7 Jul 2023 19:12:18 +0800 Subject: [PATCH 22/24] fix:set commitOffset is send msg success & optimize log & add test cases for TD-25129 --- source/client/src/clientTmq.c | 4 +- source/client/test/clientTests.cpp | 140 +++++++++++++++++++++++++++++ source/dnode/vnode/src/tq/tq.c | 4 +- 3 files changed, 145 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3fbde36e89..5fd79a2711 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -678,6 +678,8 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm taosMemoryFree(pParamSet); pCommitFp(tmq, code, userParam); } + // update the offset value. + pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset; } else { // do not perform commit, callback user function directly. taosMemoryFree(pParamSet); pCommitFp(tmq, code, userParam); @@ -2712,7 +2714,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a // char offsetBuf[TSDB_OFFSET_LEN] = {0}; // tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); - tscInfo("vgId:%d offset is old to:%"PRId64, p->vgId, p->currentOffset); + tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset); pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerEnd = p->end; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index ccc17289b0..3c46d17802 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1073,6 +1073,146 @@ TEST(clientCase, sub_db_test) { fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); } +TEST(clientCase, td_25129) { +// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); + + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + tmq_conf_t* conf = tmq_conf_new(); + + tmq_conf_set(conf, "enable.auto.commit", "false"); + tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); + tmq_conf_set(conf, "group.id", "group_id_2"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, "tp"); + + // 启动订阅 + tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); + + TAOS_FIELD* fields = NULL; + int32_t numOfFields = 0; + int32_t precision = 0; + int32_t totalRows = 0; + int32_t msgCnt = 0; + int32_t timeout = 2000; + + int32_t count = 0; + + tmq_topic_assignment* pAssign = NULL; + int32_t numOfAssign = 0; + + int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } + +// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, 4); + tmq_free_assignment(pAssign); + + code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } + + tmq_free_assignment(pAssign); + + code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } + + while (1) { + TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); + if (pRes) { + char buf[128]; + + const char* topicName = tmq_get_topic_name(pRes); +// const char* dbName = tmq_get_db_name(pRes); +// int32_t vgroupId = tmq_get_vgroup_id(pRes); +// +// printf("topic: %s\n", topicName); +// printf("db: %s\n", dbName); +// printf("vgroup id: %d\n", vgroupId); + + printSubResults(pRes, &totalRows); + } else { + tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset); + tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset); + continue; + } + +// tmq_commit_sync(tmq, pRes); + if (pRes != NULL) { + taos_free_result(pRes); + // if ((++count) > 1) { + // break; + // } + } else { + break; + } + +// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin); + } + + tmq_free_assignment(pAssign); + + code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } + + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); +} + TEST(clientCase, sub_tb_test) { taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2d757fff08..c310ca4d40 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -490,7 +490,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (!exec) { tqSetHandleExec(pHandle); // qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, + tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle); taosWUnLockLatch(&pTq->lock); break; @@ -518,7 +518,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = tqExtractDataForMq(pTq, pHandle, &req, pMsg); tqSetHandleIdle(pHandle); - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, + tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle); return code; } From ab4e2ebc3ab4458df14831ab520fd4289eb28d9d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 7 Jul 2023 21:45:22 +0800 Subject: [PATCH 23/24] fix(stream): fix bug that causes the endless loop. --- source/libs/stream/src/streamExec.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 051c664c53..89fcecda39 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -403,6 +403,10 @@ int32_t streamExecForAll(SStreamTask* pTask) { // wait for the task to be ready to go while (pTask->taskLevel == TASK_LEVEL__SOURCE) { int8_t status = atomic_load_8(&pTask->status.taskStatus); + if (status == TASK_STATUS__DROPPING) { + break; + } + if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, status); taosMsleep(100); From f9f656b21a617d83a35b17acf91a1af040660921 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 8 Jul 2023 13:25:39 +0800 Subject: [PATCH 24/24] fix:return offset stored if get assignment in init mode --- source/dnode/vnode/src/tq/tq.c | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c310ca4d40..0c5ccab6a0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -571,10 +571,26 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { if (reqOffset.type == TMQ_OFFSET__LOG) { dataRsp.rspOffset.version = reqOffset.version; - } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { - dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position - } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { - dataRsp.rspOffset.version = ever; + } else if(reqOffset.type < 0){ + STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey); + if (pOffset != NULL) { + if (pOffset->val.type != TMQ_OFFSET__LOG) { + tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey); + terrno = TSDB_CODE_INVALID_PARA; + tDeleteMqDataRsp(&dataRsp); + return -1; + } + + dataRsp.rspOffset.version = pOffset->val.version; + tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); + }else{ + if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { + dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position + } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { + dataRsp.rspOffset.version = ever; + } + tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); + } } else { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey, reqOffset.type); @@ -582,7 +598,6 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { tDeleteMqDataRsp(&dataRsp); return -1; } - tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever); tDeleteMqDataRsp(&dataRsp);