diff --git a/docs/zh/14-reference/01-components/01-taosd.md b/docs/zh/14-reference/01-components/01-taosd.md index 004116b478..ff6b27092d 100644 --- a/docs/zh/14-reference/01-components/01-taosd.md +++ b/docs/zh/14-reference/01-components/01-taosd.md @@ -154,6 +154,12 @@ charset 的有效值是 UTF-8。 | :-----------: | :-------------------------------------------------------------------------: | | supportVnodes | dnode 支持的最大 vnode 数目,取值范围:0-4096,缺省值: CPU 核数的 2 倍 + 5 | +### 内存相关 +| 参数名称 | 参数说明 | +| :----------------: | :---------------------------------------------: | +| rpcQueueMemoryAllowed | 一个 dnode 允许的 rpc 消息占用的内存最大值,单位 bytes,取值范围:10485760-INT64_MAX,缺省值:服务器内存的 1/10 | +| syncLogBufferMemoryAllowed | 一个 dnode 允许的 sync 日志缓存消息占用的内存最大值,单位 bytes,取值范围:10485760-INT64_MAX,缺省值:服务器内存的 1/10,3.1.3.2/3.3.2.13 版本开始生效 | + ### 性能调优 | 参数名称 | 参数说明 | diff --git a/docs/zh/14-reference/03-taos-sql/22-meta.md b/docs/zh/14-reference/03-taos-sql/22-meta.md index 35b71feb2c..397b3d3362 100644 --- a/docs/zh/14-reference/03-taos-sql/22-meta.md +++ b/docs/zh/14-reference/03-taos-sql/22-meta.md @@ -93,8 +93,8 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | 4 | vgroups | INT | 数据库中有多少个 vgroup。需要注意,`vgroups` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 6 | replica | INT | 副本数。需要注意,`replica` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 7 | strict | VARCHAR(4) | 废弃参数 | -| 8 | duration | VARCHAR(10) | 单文件存储数据的时间跨度。需要注意,`duration` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | -| 9 | keep | VARCHAR(32) | 数据保留时长。需要注意,`keep` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | +| 8 | duration | VARCHAR(10) | 单文件存储数据的时间跨度。需要注意,`duration` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。内部存储单位为分钟,查询时有可能转换为天或小时展示 | +| 9 | keep | VARCHAR(32) | 数据保留时长。需要注意,`keep` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 内部存储单位为分钟,查询时有可能转换为天或小时展示 | | 10 | buffer | INT | 每个 vnode 写缓存的内存块大小,单位 MB。需要注意,`buffer` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 11 | pagesize | INT | 每个 VNODE 中元数据存储引擎的页大小,单位为 KB。需要注意,`pagesize` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 12 | pages | INT | 每个 vnode 元数据存储引擎的缓存页个数。需要注意,`pages` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 2d4d437649..bece14c17d 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -94,6 +94,7 @@ extern int32_t tsElectInterval; extern int32_t tsHeartbeatInterval; extern int32_t tsHeartbeatTimeout; extern int32_t tsSnapReplMaxWaitN; +extern int64_t tsLogBufferMemoryAllowed; // maximum allowed log buffer size in bytes for each dnode // arbitrator extern int32_t tsArbHeartBeatIntervalSec; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index eb329192a2..ba0930955c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -966,6 +966,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013) #define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014) #define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015) +#define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/include/util/tdef.h b/include/util/tdef.h index 46a0d01457..46c84ab26a 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -332,6 +332,7 @@ typedef enum ELogicConditionType { #define TSDB_MAX_LEARNER_REPLICA 10 #define TSDB_SYNC_LOG_BUFFER_SIZE 4096 #define TSDB_SYNC_LOG_BUFFER_RETENTION 256 +#define TSDB_SYNC_LOG_BUFFER_THRESHOLD (1024 * 1024 * 5) #define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512 #define TSDB_SYNC_NEGOTIATION_WIN 512 diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 9b0a9299bf..5bab536332 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -77,7 +77,10 @@ bool chkRequestKilled(void* param) { return killed; } -void cleanupAppInfo() { taosHashCleanup(appInfo.pInstMap); } +void cleanupAppInfo() { + taosHashCleanup(appInfo.pInstMap); + taosHashCleanup(appInfo.pInstMapByClusterId); +} static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index cc7a7f9489..d75d713028 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -73,6 +73,8 @@ void taos_cleanup(void) { tscWarn("failed to cleanup task queue"); } + tmqMgmtClose(); + int32_t id = clientReqRefPool; clientReqRefPool = -1; taosCloseRef(id); @@ -87,9 +89,6 @@ void taos_cleanup(void) { tscDebug("rpc cleanup"); taosConvDestroy(); - - tmqMgmtClose(); - DestroyRegexCache(); tscInfo("all local resources released"); diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index ce99cd7deb..feaed803df 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -1889,7 +1889,12 @@ int stmtGetParamTbName(TAOS_STMT2* stmt, int* nums) { STMT_ERR_RET(stmtParseSql(pStmt)); } - *nums = STMT_TYPE_MULTI_INSERT == pStmt->sql.type ? 1 : 0; + if (TSDB_CODE_TSC_STMT_TBNAME_ERROR == pStmt->errCode) { + *nums = 1; + pStmt->errCode = TSDB_CODE_SUCCESS; + } else { + *nums = STMT_TYPE_MULTI_INSERT == pStmt->sql.type ? 1 : 0; + } return TSDB_CODE_SUCCESS; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 0cc38a3a12..9e1c7b2b88 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2906,7 +2906,10 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { tmqFreeRspWrapper((SMqRspWrapper*)pWrapper); taosFreeQitem(pWrapper); } else { - (void)taosWriteQitem(tmq->mqueue, pWrapper); + if (taosWriteQitem(tmq->mqueue, pWrapper) != 0){ + tmqFreeRspWrapper((SMqRspWrapper*)pWrapper); + taosFreeQitem(pWrapper); + } } } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 5b67e1267b..5880a5e7e7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -81,6 +81,7 @@ int32_t tsElectInterval = 25 * 1000; int32_t tsHeartbeatInterval = 1000; int32_t tsHeartbeatTimeout = 20 * 1000; int32_t tsSnapReplMaxWaitN = 128; +int64_t tsLogBufferMemoryAllowed = 0; // bytes // mnode int64_t tsMndSdbWriteDelta = 200; @@ -613,7 +614,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, CFG_SCOPE_BOTH, CFG_DYN_NONE)); tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 72000); - TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_ENT_BOTH)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_NONE)); tsNumOfTaskQueueThreads = tsNumOfCores * 2; tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 16); @@ -702,6 +703,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); + tsLogBufferMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; + tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); + // clang-format off TAOS_CHECK_RETURN(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER, CFG_DYN_NONE)); @@ -736,6 +740,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_NONE)); + TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "syncLogBufferMemoryAllowed", tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbHeartBeatIntervalSec", tsArbHeartBeatIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbCheckSyncIntervalSec", tsArbCheckSyncIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE)); @@ -970,6 +975,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } + pItem = cfgGetItem(tsCfg, "syncLogBufferMemoryAllowed"); + if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { + tsLogBufferMemoryAllowed = totalMemoryKB * 1024 * 0.1; + tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); + pItem->i64 = tsLogBufferMemoryAllowed; + pItem->stype = stype; + } + TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -1520,6 +1533,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncSnapReplMaxWaitN"); tsSnapReplMaxWaitN = pItem->i32; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncLogBufferMemoryAllowed"); + tsLogBufferMemoryAllowed = pItem->i64; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbHeartBeatIntervalSec"); tsArbHeartBeatIntervalSec = pItem->i32; @@ -1954,6 +1970,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"randErrorChance", &tsRandErrChance}, {"randErrorDivisor", &tsRandErrDivisor}, {"randErrorScope", &tsRandErrScope}, + {"syncLogBufferMemoryAllowed", &tsLogBufferMemoryAllowed}, {"cacheLazyLoadThreshold", &tsCacheLazyLoadThreshold}, {"checkpointInterval", &tsStreamCheckpointInterval}, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 758a4aeec3..4aed628a92 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -10346,7 +10346,11 @@ int32_t tDecodeSMCreateStbRsp(SDecoder *pDecoder, SMCreateStbRsp *pRsp) { } tEndDecode(pDecoder); + return code; + _exit: + tFreeSTableMetaRsp(pRsp->pMeta); + taosMemoryFreeClear(pRsp->pMeta); return code; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index f71ab95d03..469ae4137c 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -511,6 +511,11 @@ static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerOb } } } + // no topics need to be rebalanced + if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { + code = TSDB_CODE_TMQ_NO_NEED_REBALANCE; + } + END: return code; } @@ -581,6 +586,10 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { if(ubSubscribe){ SMqConsumerObj *pConsumerTmp = NULL; MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp)); + if (taosArrayGetSize(pConsumerTmp->assignedTopics) == 0){ + mndReleaseConsumer(pMnode, pConsumerTmp); + goto END; + } mndReleaseConsumer(pMnode, pConsumerTmp); } MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames)); @@ -599,7 +608,7 @@ END: mndTransDrop(pTrans); tDeleteSMqConsumerObj(pConsumerNew); taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); - return code; + return (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code; } SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 2b32dc7781..e73cc1b5db 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -543,14 +543,14 @@ void mndSyncCheckTimeout(SMnode *pMnode) { if (delta > MNODE_TIMEOUT_SEC) { mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId, pMgmt->transSec, curSec, delta, pMgmt->transSeq); - pMgmt->transId = 0; - pMgmt->transSec = 0; - pMgmt->transSeq = 0; - terrno = TSDB_CODE_SYN_TIMEOUT; - pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT; - if (tsem_post(&pMgmt->syncSem) < 0) { - mError("failed to post sem"); - } + // pMgmt->transId = 0; + // pMgmt->transSec = 0; + // pMgmt->transSeq = 0; + // terrno = TSDB_CODE_SYN_TIMEOUT; + // pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT; + //if (tsem_post(&pMgmt->syncSem) < 0) { + // mError("failed to post sem"); + //} } else { mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId, pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index b24aa6fb1d..72f2052867 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -625,7 +625,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } double cost = (taosGetTimestampUs() - st) / 1000.0; if (cost > tsCacheLazyLoadThreshold) { - pr->lastTs = totalLastTs; + // pr->lastTs = totalLastTs; } } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 25e635330d..4369b1df54 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -667,8 +667,8 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int val = *pVal; } else { pCache->cacheHit += 1; - STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h); - val = *pVal; + STableCachedVal* pValTmp = taosLRUCacheValue(pCache->pTableMetaEntryCache, h); + val = *pValTmp; bool bRes = taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false); qTrace("release LRU cache, res %d", bRes); @@ -720,12 +720,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) { taosMemoryFree(data); } - if (code) { - if (freeReader) { - pHandle->api.metaReaderFn.clearReader(&mr); - } - return code; - } + QUERY_CHECK_CODE(code, lino, _end); } else { // todo opt for json tag for (int32_t i = 0; i < pBlock->info.rows; ++i) { code = colDataSetVal(pColInfoData, i, data, false); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 009854b45b..0b653ddbe9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "sync.h" -#include "taosdef.h" +#include "tglobal.h" #include "trpc.h" #include "ttimer.h" diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index ea85b796d5..427a3690f2 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -54,6 +54,7 @@ typedef struct SSyncLogBuffer { int64_t matchIndex; int64_t endIndex; int64_t size; + int64_t bytes; TdThreadMutex mutex; TdThreadMutexAttr attr; int64_t totalIndex; diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 86d4d83d29..d042bf8ade 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -28,6 +28,8 @@ #include "syncUtil.h" #include "syncVoteMgr.h" +static int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer + static bool syncIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM); @@ -101,6 +103,10 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term}; pBuf->entries[index % pBuf->size] = tmp; pBuf->endIndex = index + 1; + if (pNode->vgId > 1) { + pBuf->bytes += pEntry->bytes; + (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); + } (void)taosThreadMutexUnlock(&pBuf->mutex); TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); @@ -260,6 +266,10 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1}; pBuf->entries[index % pBuf->size] = tmp; taken = true; + if (pNode->vgId > 1) { + pBuf->bytes += pEntry->bytes; + (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); + } } if (index < toIndex) { @@ -286,6 +296,10 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { } SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm}; pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp; + if (pNode->vgId > 1) { + pBuf->bytes += pDummy->bytes; + (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes); + } if (index < toIndex) { pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex; @@ -330,6 +344,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); } pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0; + pBuf->bytes = 0; int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode); if (code < 0) { sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code)); @@ -470,8 +485,12 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt goto _out; } SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm}; - pEntry = NULL; pBuf->entries[index % pBuf->size] = tmp; + if (pNode->vgId > 1) { + pBuf->bytes += pEntry->bytes; + (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); + } + pEntry = NULL; // update end index pBuf->endIndex = TMAX(index + 1, pBuf->endIndex); @@ -846,14 +865,34 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } // recycle + bool isVnode = pNode->vgId > 1; SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION; - for (SyncIndex index = pBuf->startIndex; index < until; index++) { - SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; - if (pEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR; + do { + if ((pBuf->startIndex >= pBuf->commitIndex) || + !((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD && + atomic_load_64(&tsLogBufferMemoryUsed) >= tsLogBufferMemoryAllowed))) { + break; + } + SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem; + if (pEntry == NULL) { + sError("vgId:%d, invalid log entry to recycle. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64 + ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64, + pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (isVnode) { + pBuf->bytes -= pEntry->bytes; + (void)atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); + } + sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64 + ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64 + ", used:%" PRId64 ", allowed:%" PRId64, + pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term, + pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed), tsLogBufferMemoryAllowed); syncEntryDestroy(pEntry); - (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); - pBuf->startIndex = index + 1; - } + (void)memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); + ++pBuf->startIndex; + } while (true); code = 0; _out: @@ -1324,6 +1363,7 @@ void syncLogBufferClear(SSyncLogBuffer* pBuf) { (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); } pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0; + pBuf->bytes = 0; (void)taosThreadMutexUnlock(&pBuf->mutex); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 5ceec33831..ccf30438bd 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -806,6 +806,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed valu TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") diff --git a/tests/army/query/function/ans/rand.csv b/tests/army/query/function/ans/rand.csv new file mode 100644 index 0000000000..40e20c5ba4 --- /dev/null +++ b/tests/army/query/function/ans/rand.csv @@ -0,0 +1,21 @@ +0.663936012733698 +0.840187717154710 +0.840187717154710 +0.700976369297587 +0.561380175203728 +0.916457875592847 +0.274745596235034 +0.135438768721856 +0.486904139391568 +0.352760728612896 +0.206965447965528 +0.419929514834624 +0.419929514834624 +0.419929514834624 +0.419929514834624 +0.419929514834624 +0.419929514834624 +0.419929514834624 +0.419929514834624 +0.419929514834624 +0.419929514834624 \ No newline at end of file diff --git a/tests/army/query/function/in/rand.in b/tests/army/query/function/in/rand.in new file mode 100644 index 0000000000..185a76d6f9 --- /dev/null +++ b/tests/army/query/function/in/rand.in @@ -0,0 +1,3 @@ +select RAND(1245); +select RAND(id) from ts_4893.d0 limit 10; +select RAND(id) from ts_4893.d0 order by id desc limit 10; \ No newline at end of file diff --git a/tests/army/query/function/test_function.py b/tests/army/query/function/test_function.py index 4981e93563..18a0d46711 100644 --- a/tests/army/query/function/test_function.py +++ b/tests/army/query/function/test_function.py @@ -509,31 +509,98 @@ class TDTestCase(TBase): tdSql.error( "select * from (select to_iso8601(ts, timezone()), timezone() from meters order by ts desc) limit 1000;", expectErrInfo="Not supported timzone format") # TS-5340 + + def test_rand(self): + self.test_normal_query("rand") + + tdSql.query("select rand();") + tdSql.checkRows(1) + tdSql.checkCols(1) + self.check_result_in_range(0, 0) + + tdSql.query("select rand(null);") + tdSql.checkRows(1) + tdSql.checkCols(1) + self.check_result_in_range(0, 0) + + tdSql.query("select rand() from (select 1) t limit 1;") + tdSql.checkRows(1) + tdSql.checkCols(1) + self.check_result_in_range(0, 0) + + tdSql.query("select rand(id) from ts_4893.d0 limit 100;") + tdSql.checkRows(100) + tdSql.checkCols(1) + for i in range(len(tdSql.res)): + self.check_result_in_range(i, 0) + + tdSql.query("select rand(id) from ts_4893.meters limit 100;") + tdSql.checkRows(100) + tdSql.checkCols(1) + for i in range(len(tdSql.res)): + self.check_result_in_range(i, 0) + + tdSql.query("select rand(123), rand(123);") + tdSql.checkRows(1) + tdSql.checkCols(2) + if tdSql.res[0][0] != tdSql.res[0][1]: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno, tdSql.sql, tdSql.res[0][0], tdSql.res[0][1]) + tdLog.exit("%s(%d) failed: sql:%s data1:%s ne data2:%s" % args) + + def check_result_in_range(self, row, col): + res = tdSql.res[row][col] + if res < 0 or res >= 1: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno, tdSql.sql, row, col, res) + tdLog.exit("%s(%d) failed: sql:%s row:%s col:%s data:%s lt 0 or ge 1" % args) + + def test_max(self): + self.test_normal_query("max") + + tdSql.query("select max(null) from ts_4893.meters;") + tdSql.checkRows(1) + tdSql.checkCols(1) + tdSql.checkData(0, 0, 'None') + + tdSql.query("select max(id) from ts_4893.meters;") + tdSql.checkRows(1) + + tdSql.query("select max(name) from ts_4893.meters;") + tdSql.checkRows(1) + + tdSql.query("select max(current) from ts_4893.meters;") + tdSql.checkRows(1) + + tdSql.query("select max(nch1) from ts_4893.meters;") + tdSql.checkRows(1) + + tdSql.query("select max(var1) from ts_4893.meters;") + tdSql.checkRows(1) + def test_min(self): self.test_normal_query("min") - tdSql.query("select min(var1), min(id) from ts_4893.d0;") + tdSql.query("select min(null) from ts_4893.meters;") tdSql.checkRows(1) - tdSql.checkData(0, 0, 'abc一二三abc一二三abc') - tdSql.checkData(0, 1, 0) - def test_max(self): - self.test_normal_query("max") - tdSql.query("select max(var1), max(id) from ts_4893.d0;") - tdSql.checkRows(1) - tdSql.checkData(0, 0, '一二三四五六七八九十') - tdSql.checkData(0, 1, 9999) - def test_rand(self): - tdSql.query("select rand();") + tdSql.checkCols(1) + tdSql.checkData(0, 0, 'None') + + tdSql.query("select min(id) from ts_4893.meters;") tdSql.checkRows(1) - tdSql.query("select rand(1);") + tdSql.query("select min(name) from ts_4893.meters;") tdSql.checkRows(1) - tdSql.query("select rand(1) from ts_4893.meters limit 10;") - tdSql.checkRows(10) + tdSql.query("select min(current) from ts_4893.meters;") + tdSql.checkRows(1) + + tdSql.query("select min(nch1) from ts_4893.meters;") + tdSql.checkRows(1) + + tdSql.query("select min(var1) from ts_4893.meters;") + tdSql.checkRows(1) - tdSql.query("select rand(id) from ts_4893.d0 limit 10;") - tdSql.checkRows(10) # run def run(self): tdLog.debug(f"start to excute {__file__}") @@ -576,8 +643,8 @@ class TDTestCase(TBase): self.test_varpop() # select function - self.test_min() self.test_max() + self.test_min() # error function self.test_error() diff --git a/tests/army/query/sys/tb_perf_queries_exist_test.py b/tests/army/query/sys/tb_perf_queries_exist_test.py new file mode 100644 index 0000000000..e6afc0bec6 --- /dev/null +++ b/tests/army/query/sys/tb_perf_queries_exist_test.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- + +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame import * +from frame.autogen import * + +''' + TS-5349: https://jira.taosdata.com:18080/browse/TS-5349 + 查询 performance_schema.perf_queries 后, 再查询 information_schema.perf_queries, + 正常情况下在 information_schema 中不存在表 perf_queries +''' + +class TDTestCase(TBase): + + def run(self): + tdSql.query("select * from performance_schema.perf_queries;") + tdLog.info("Table [perf_queries] exist in schema [performance_schema]") + + tdSql.error("select * from information_schema.perf_queries;") + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 76ec6422b9..04efaf92c4 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -42,6 +42,7 @@ ,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_compare_asc_desc.py ,,y,army,./pytest.sh python3 ./test.py -f query/last/test_last.py ,,y,army,./pytest.sh python3 ./test.py -f query/window/base.py +,,y,army,./pytest.sh python3 ./test.py -f query/sys/tb_perf_queries_exist_test.py -N 3 # # system test