From 883d2bfb7ab248ae8b932ad2d79e45809f0d5a74 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 10 Sep 2024 14:54:00 +0800 Subject: [PATCH 01/25] fix: oom with large submit --- source/libs/sync/src/syncPipeline.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index a6e9c7de32..4cdc6c3d83 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -846,7 +846,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } // recycle - SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION; + SyncIndex until = pBuf->commitIndex; // - TSDB_SYNC_LOG_BUFFER_RETENTION; for (SyncIndex index = pBuf->startIndex; index < until; index++) { SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; if (pEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR; From 1850f3fb5227f62a34591b7e2c91efeab08be5c1 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 11 Sep 2024 11:51:02 +0800 Subject: [PATCH 02/25] fix:[TD-31962]memory leak by crash_gen --- include/util/taoserror.h | 1 + source/client/src/clientEnv.c | 2 +- source/client/src/clientTmq.c | 5 ++++- source/common/src/tmsg.c | 4 ++++ source/dnode/mnode/impl/src/mndConsumer.c | 11 ++++++++++- source/libs/executor/src/scanoperator.c | 12 ++++-------- source/util/src/terror.c | 1 + 7 files changed, 25 insertions(+), 11 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 4591c7fbcc..16027730f7 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -962,6 +962,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013) #define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014) #define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015) +#define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index fec1060042..101bd9341c 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -941,7 +941,7 @@ void taos_init_imp(void) { tscInitRes = TSDB_CODE_OUT_OF_MEMORY; return; } - taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst); +// taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst); avoid heap use after free deltaToUtcInitOnce(); char logDirName[64] = {0}; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 783815d97f..0395869367 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2896,7 +2896,10 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { tmqFreeRspWrapper((SMqRspWrapper*)pWrapper); taosFreeQitem(pWrapper); } else { - (void)taosWriteQitem(tmq->mqueue, pWrapper); + if (taosWriteQitem(tmq->mqueue, pWrapper) != 0){ + tmqFreeRspWrapper((SMqRspWrapper*)pWrapper); + taosFreeQitem(pWrapper); + } } } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 758a4aeec3..4aed628a92 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -10346,7 +10346,11 @@ int32_t tDecodeSMCreateStbRsp(SDecoder *pDecoder, SMCreateStbRsp *pRsp) { } tEndDecode(pDecoder); + return code; + _exit: + tFreeSTableMetaRsp(pRsp->pMeta); + taosMemoryFreeClear(pRsp->pMeta); return code; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 606b93035f..403a5b89a8 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -511,6 +511,11 @@ static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerOb } } } + // no topics need to be rebalanced + if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { + code = TSDB_CODE_TMQ_NO_NEED_REBALANCE; + } + END: return code; } @@ -581,6 +586,10 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { if(ubSubscribe){ SMqConsumerObj *pConsumerTmp = NULL; MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp)); + if (taosArrayGetSize(pConsumerTmp->assignedTopics) == 0){ + mndReleaseConsumer(pMnode, pConsumerTmp); + goto END; + } mndReleaseConsumer(pMnode, pConsumerTmp); } MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames)); @@ -599,7 +608,7 @@ END: mndTransDrop(pTrans); tDeleteSMqConsumerObj(pConsumerNew); taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); - return code; + return (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code; } SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 42e7e4ac3b..717b8793f2 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -668,8 +668,8 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int val = *pVal; } else { pCache->cacheHit += 1; - STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h); - val = *pVal; + STableCachedVal* pValTmp = taosLRUCacheValue(pCache->pTableMetaEntryCache, h); + val = *pValTmp; bool bRes = taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false); qTrace("release LRU cache, res %d", bRes); @@ -721,12 +721,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) { taosMemoryFree(data); } - if (code) { - if (freeReader) { - pHandle->api.metaReaderFn.clearReader(&mr); - } - return code; - } + QUERY_CHECK_CODE(code, lino, _end); } else { // todo opt for json tag for (int32_t i = 0; i < pBlock->info.rows; ++i) { code = colDataSetVal(pColInfoData, i, data, false); @@ -746,6 +741,7 @@ _end: sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW, NULL); if (insertRet != TAOS_LRU_STATUS_OK) { qWarn("failed to put meta into lru cache, code:%d, %s", insertRet, idStr); + freeTableCachedVal(pVal); } } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 58dde5cd23..f85c76f157 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -806,6 +806,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed valu TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") From 2a13aa08e15c4962b154cc0b9675932c55d074d7 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 11 Sep 2024 14:23:47 +0800 Subject: [PATCH 03/25] fix: oom with large msg --- include/common/tglobal.h | 1 + include/util/tdef.h | 1 + source/common/src/tglobal.c | 17 ++++++++++ source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/inc/syncPipeline.h | 1 + source/libs/sync/src/syncPipeline.c | 50 ++++++++++++++++++++++++----- 6 files changed, 63 insertions(+), 9 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 2d4d437649..bece14c17d 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -94,6 +94,7 @@ extern int32_t tsElectInterval; extern int32_t tsHeartbeatInterval; extern int32_t tsHeartbeatTimeout; extern int32_t tsSnapReplMaxWaitN; +extern int64_t tsLogBufferMemoryAllowed; // maximum allowed log buffer size in bytes for each dnode // arbitrator extern int32_t tsArbHeartBeatIntervalSec; diff --git a/include/util/tdef.h b/include/util/tdef.h index 46a0d01457..46c84ab26a 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -332,6 +332,7 @@ typedef enum ELogicConditionType { #define TSDB_MAX_LEARNER_REPLICA 10 #define TSDB_SYNC_LOG_BUFFER_SIZE 4096 #define TSDB_SYNC_LOG_BUFFER_RETENTION 256 +#define TSDB_SYNC_LOG_BUFFER_THRESHOLD (1024 * 1024 * 5) #define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512 #define TSDB_SYNC_NEGOTIATION_WIN 512 diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 5b67e1267b..2519ead06e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -81,6 +81,7 @@ int32_t tsElectInterval = 25 * 1000; int32_t tsHeartbeatInterval = 1000; int32_t tsHeartbeatTimeout = 20 * 1000; int32_t tsSnapReplMaxWaitN = 128; +int64_t tsLogBufferMemoryAllowed = 0; // bytes // mnode int64_t tsMndSdbWriteDelta = 200; @@ -702,6 +703,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); + tsLogBufferMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; + tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); + // clang-format off TAOS_CHECK_RETURN(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER, CFG_DYN_NONE)); @@ -736,6 +740,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_NONE)); + TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "syncLogBufferMemoryAllowed", tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbHeartBeatIntervalSec", tsArbHeartBeatIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbCheckSyncIntervalSec", tsArbCheckSyncIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE)); @@ -970,6 +975,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } + pItem = cfgGetItem(tsCfg, "syncLogBufferMemoryAllowed"); + if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { + tsLogBufferMemoryAllowed = totalMemoryKB * 1024 * 0.1; + tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); + pItem->i64 = tsLogBufferMemoryAllowed; + pItem->stype = stype; + } + TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -1520,6 +1533,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncSnapReplMaxWaitN"); tsSnapReplMaxWaitN = pItem->i32; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncLogBufferMemoryAllowed"); + tsLogBufferMemoryAllowed = pItem->i64; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbHeartBeatIntervalSec"); tsArbHeartBeatIntervalSec = pItem->i32; @@ -1954,6 +1970,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"randErrorChance", &tsRandErrChance}, {"randErrorDivisor", &tsRandErrDivisor}, {"randErrorScope", &tsRandErrScope}, + {"syncLogBufferMemoryAllowed", &tsLogBufferMemoryAllowed}, {"cacheLazyLoadThreshold", &tsCacheLazyLoadThreshold}, {"checkpointInterval", &tsStreamCheckpointInterval}, diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 009854b45b..0b653ddbe9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "sync.h" -#include "taosdef.h" +#include "tglobal.h" #include "trpc.h" #include "ttimer.h" diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index ea85b796d5..427a3690f2 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -54,6 +54,7 @@ typedef struct SSyncLogBuffer { int64_t matchIndex; int64_t endIndex; int64_t size; + int64_t bytes; TdThreadMutex mutex; TdThreadMutexAttr attr; int64_t totalIndex; diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 4cdc6c3d83..5d20f9290b 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -28,6 +28,8 @@ #include "syncUtil.h" #include "syncVoteMgr.h" +static int64_t sSyncLogBufferBytes = 0; // total bytes of vnode log buffer + static bool syncIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM); @@ -101,6 +103,10 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term}; pBuf->entries[index % pBuf->size] = tmp; pBuf->endIndex = index + 1; + if (pNode->vgId > 1) { + pBuf->bytes += pEntry->bytes; + atomic_add_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); + } (void)taosThreadMutexUnlock(&pBuf->mutex); TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); @@ -330,6 +336,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); } pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0; + pBuf->bytes = 0; int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode); if (code < 0) { sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code)); @@ -470,8 +477,12 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt goto _out; } SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm}; - pEntry = NULL; pBuf->entries[index % pBuf->size] = tmp; + if (pNode->vgId > 1) { + pBuf->bytes += pEntry->bytes; + atomic_add_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); + } + pEntry = NULL; // update end index pBuf->endIndex = TMAX(index + 1, pBuf->endIndex); @@ -846,14 +857,36 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } // recycle - SyncIndex until = pBuf->commitIndex; // - TSDB_SYNC_LOG_BUFFER_RETENTION; - for (SyncIndex index = pBuf->startIndex; index < until; index++) { - SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; - if (pEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR; + bool isVnode = pNode->vgId > 1; + SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION; + do { + if (pBuf->startIndex >= pBuf->commitIndex) { + break; + } + SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem; + if (pEntry == NULL) { + sError("vgId:%d, invalid log entry to recycle. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64 + ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64, + pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (!((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD && + atomic_load_64(&sSyncLogBufferBytes) >= tsLogBufferMemoryAllowed))) { + break; + } + if (isVnode) { + pBuf->bytes -= pEntry->bytes; + atomic_sub_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); + } + sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64 + ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64 + ", total bytes:%" PRId64, + pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term, + pEntry->bytes, pBuf->bytes, atomic_load_64(&sSyncLogBufferBytes)); syncEntryDestroy(pEntry); - (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); - pBuf->startIndex = index + 1; - } + memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); + ++pBuf->startIndex; + } while (true); code = 0; _out: @@ -1324,6 +1357,7 @@ void syncLogBufferClear(SSyncLogBuffer* pBuf) { (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); } pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0; + pBuf->bytes = 0; (void)taosThreadMutexUnlock(&pBuf->mutex); } From e92a58043ab8923b2f0b2301185c092db190bf1f Mon Sep 17 00:00:00 2001 From: Kaili Xu Date: Wed, 11 Sep 2024 14:44:29 +0800 Subject: [PATCH 04/25] enh: code optimization --- source/libs/sync/src/syncPipeline.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 5d20f9290b..1ba9de8fa7 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -860,7 +860,9 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm bool isVnode = pNode->vgId > 1; SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION; do { - if (pBuf->startIndex >= pBuf->commitIndex) { + if ((pBuf->startIndex >= pBuf->commitIndex) || + !((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD && + atomic_load_64(&sSyncLogBufferBytes) >= tsLogBufferMemoryAllowed))) { break; } SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem; @@ -870,10 +872,6 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term); return TSDB_CODE_SYN_INTERNAL_ERROR; } - if (!((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD && - atomic_load_64(&sSyncLogBufferBytes) >= tsLogBufferMemoryAllowed))) { - break; - } if (isVnode) { pBuf->bytes -= pEntry->bytes; atomic_sub_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); From ffaa1092c027577eb28e616c8feebb6e9ccfe977 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 11 Sep 2024 14:51:52 +0800 Subject: [PATCH 05/25] fix: oom with large msg --- source/libs/sync/src/syncPipeline.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 1ba9de8fa7..d7c55fcd8a 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -28,7 +28,7 @@ #include "syncUtil.h" #include "syncVoteMgr.h" -static int64_t sSyncLogBufferBytes = 0; // total bytes of vnode log buffer +int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer static bool syncIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || @@ -105,7 +105,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt pBuf->endIndex = index + 1; if (pNode->vgId > 1) { pBuf->bytes += pEntry->bytes; - atomic_add_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); + atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } (void)taosThreadMutexUnlock(&pBuf->mutex); @@ -480,7 +480,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt pBuf->entries[index % pBuf->size] = tmp; if (pNode->vgId > 1) { pBuf->bytes += pEntry->bytes; - atomic_add_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); + atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } pEntry = NULL; @@ -862,7 +862,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm do { if ((pBuf->startIndex >= pBuf->commitIndex) || !((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD && - atomic_load_64(&sSyncLogBufferBytes) >= tsLogBufferMemoryAllowed))) { + atomic_load_64(&tsLogBufferMemoryUsed) >= tsLogBufferMemoryAllowed))) { break; } SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem; @@ -874,13 +874,13 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } if (isVnode) { pBuf->bytes -= pEntry->bytes; - atomic_sub_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); + atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64 ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64 ", total bytes:%" PRId64, pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term, - pEntry->bytes, pBuf->bytes, atomic_load_64(&sSyncLogBufferBytes)); + pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed)); syncEntryDestroy(pEntry); memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); ++pBuf->startIndex; From 8b24bae5334a8fe188b32fae3d784eddb77af612 Mon Sep 17 00:00:00 2001 From: Kaili Xu Date: Wed, 11 Sep 2024 17:08:34 +0800 Subject: [PATCH 06/25] enh: return value of memset --- source/libs/sync/src/syncPipeline.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index d7c55fcd8a..ae0d5b7ddb 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -882,7 +882,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term, pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed)); syncEntryDestroy(pEntry); - memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); + (void)memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); ++pBuf->startIndex; } while (true); From e127a29e64870330a78497180dd8e3ac1f0cbb91 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 12 Sep 2024 05:55:22 +0800 Subject: [PATCH 07/25] fix: oom with large msg --- source/libs/sync/src/syncPipeline.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index d7c55fcd8a..72d586f47e 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -266,6 +266,10 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1}; pBuf->entries[index % pBuf->size] = tmp; taken = true; + if (pNode->vgId > 1) { + pBuf->bytes += pEntry->bytes; + atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); + } } if (index < toIndex) { @@ -292,6 +296,10 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { } SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm}; pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp; + if (pNode->vgId > 1) { + pBuf->bytes += pDummy->bytes; + atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes); + } if (index < toIndex) { pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex; @@ -878,9 +886,9 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64 ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64 - ", total bytes:%" PRId64, + ", used:%" PRId64 ", allowed:%" PRId64, pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term, - pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed)); + pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed), tsLogBufferMemoryAllowed); syncEntryDestroy(pEntry); memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); ++pBuf->startIndex; From e3121c24ebae09ba5e09f1be26406256046e358e Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 12 Sep 2024 06:38:15 +0800 Subject: [PATCH 08/25] fix: compile problem --- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncPipeline.c | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 0b653ddbe9..2ff890da56 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "sync.h" -#include "tglobal.h" +#include "tdef.h" #include "trpc.h" #include "ttimer.h" diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 5eac01b884..279583340d 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -27,6 +27,7 @@ #include "syncSnapshot.h" #include "syncUtil.h" #include "syncVoteMgr.h" +#include "tglobal.h" int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer From 15c92cea3fda0e8204fac0af9044ab4fd57b5dfa Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 12 Sep 2024 07:11:26 +0800 Subject: [PATCH 09/25] fix: compile problem --- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncPipeline.c | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 2ff890da56..0b653ddbe9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "sync.h" -#include "tdef.h" +#include "tglobal.h" #include "trpc.h" #include "ttimer.h" diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 279583340d..5eac01b884 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -27,7 +27,6 @@ #include "syncSnapshot.h" #include "syncUtil.h" #include "syncVoteMgr.h" -#include "tglobal.h" int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer From d9a25b31fe6346edc709bace7cc7698d1e40744f Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 12 Sep 2024 08:32:59 +0800 Subject: [PATCH 10/25] fix: compile problem --- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncPipeline.c | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 0b653ddbe9..2ff890da56 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "sync.h" -#include "tglobal.h" +#include "tdef.h" #include "trpc.h" #include "ttimer.h" diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 5eac01b884..c891db07c6 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -27,8 +27,9 @@ #include "syncSnapshot.h" #include "syncUtil.h" #include "syncVoteMgr.h" +#include "tglobal.h" -int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer +static int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer static bool syncIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || From d1cb30c47396a2cba83d9148ae4b5f653e36ebaf Mon Sep 17 00:00:00 2001 From: Kaili Xu Date: Thu, 12 Sep 2024 10:47:49 +0800 Subject: [PATCH 11/25] chore: restore header file --- source/libs/sync/inc/syncInt.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 2ff890da56..009854b45b 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "sync.h" -#include "tdef.h" +#include "taosdef.h" #include "trpc.h" #include "ttimer.h" From 36a3a035b033ba7f55cd1dcdc9da8080b26a467b Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 12 Sep 2024 11:02:27 +0800 Subject: [PATCH 12/25] fix: compile problem --- source/libs/sync/src/syncPipeline.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index c891db07c6..a7c22ffd55 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -106,7 +106,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt pBuf->endIndex = index + 1; if (pNode->vgId > 1) { pBuf->bytes += pEntry->bytes; - atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); + (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } (void)taosThreadMutexUnlock(&pBuf->mutex); @@ -269,7 +269,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { taken = true; if (pNode->vgId > 1) { pBuf->bytes += pEntry->bytes; - atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); + (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } } @@ -299,7 +299,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp; if (pNode->vgId > 1) { pBuf->bytes += pDummy->bytes; - atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes); + (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes); } if (index < toIndex) { @@ -489,7 +489,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt pBuf->entries[index % pBuf->size] = tmp; if (pNode->vgId > 1) { pBuf->bytes += pEntry->bytes; - atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); + (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } pEntry = NULL; @@ -883,7 +883,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } if (isVnode) { pBuf->bytes -= pEntry->bytes; - atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); + (void)atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64 ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64 From c2d28a208eabb537d813fa9e07bc018c51c44b0b Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 12 Sep 2024 11:18:10 +0800 Subject: [PATCH 13/25] chore: code optimization --- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncPipeline.c | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 009854b45b..0b653ddbe9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "sync.h" -#include "taosdef.h" +#include "tglobal.h" #include "trpc.h" #include "ttimer.h" diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index a7c22ffd55..44f5a293a9 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -27,7 +27,6 @@ #include "syncSnapshot.h" #include "syncUtil.h" #include "syncVoteMgr.h" -#include "tglobal.h" static int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer From 295a34fc55654a9954f6cf6e3f5459ec8922172c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 12 Sep 2024 16:09:09 +0800 Subject: [PATCH 14/25] fix:[TD-31962]memory leak by crash_gen --- source/client/src/clientEnv.c | 2 +- source/client/src/clientImpl.c | 5 ++++- source/client/src/clientMain.c | 5 ++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 101bd9341c..fec1060042 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -941,7 +941,7 @@ void taos_init_imp(void) { tscInitRes = TSDB_CODE_OUT_OF_MEMORY; return; } -// taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst); avoid heap use after free + taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst); deltaToUtcInitOnce(); char logDirName[64] = {0}; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index d77b8dcbb7..80ab094eaf 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -77,7 +77,10 @@ bool chkRequestKilled(void* param) { return killed; } -void cleanupAppInfo() { taosHashCleanup(appInfo.pInstMap); } +void cleanupAppInfo() { + taosHashCleanup(appInfo.pInstMap); + taosHashCleanup(appInfo.pInstMapByClusterId); +} static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 1c1fff9b7b..829c696c7c 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -73,6 +73,8 @@ void taos_cleanup(void) { tscWarn("failed to cleanup task queue"); } + tmqMgmtClose(); + int32_t id = clientReqRefPool; clientReqRefPool = -1; taosCloseRef(id); @@ -87,9 +89,6 @@ void taos_cleanup(void) { tscDebug("rpc cleanup"); taosConvDestroy(); - - tmqMgmtClose(); - DestroyRegexCache(); tscInfo("all local resources released"); From 3c8ae1fc9f30ba1f19a8916462c4ee4fe8344170 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 13 Sep 2024 09:27:11 +0800 Subject: [PATCH 15/25] fix:[TD-31962]memory leak by crash_gen --- source/libs/executor/src/scanoperator.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 717b8793f2..b641c66e45 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -741,7 +741,6 @@ _end: sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW, NULL); if (insertRet != TAOS_LRU_STATUS_OK) { qWarn("failed to put meta into lru cache, code:%d, %s", insertRet, idStr); - freeTableCachedVal(pVal); } } From 8547b8c942dbb01e043ba654b34e55f38a80a99f Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 13 Sep 2024 09:54:26 +0800 Subject: [PATCH 16/25] fix(tsdb/cache/read): comment lastTs out to load stt data --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index b24aa6fb1d..72f2052867 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -625,7 +625,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } double cost = (taosGetTimestampUs() - st) / 1000.0; if (cost > tsCacheLazyLoadThreshold) { - pr->lastTs = totalLastTs; + // pr->lastTs = totalLastTs; } } } From 1eb3124cbd7e7039940dce7866998397cd365a3b Mon Sep 17 00:00:00 2001 From: wangmeng Date: Fri, 13 Sep 2024 10:31:10 +0800 Subject: [PATCH 17/25] add a test case to verify TS-5349 --- .../query/sys/tb_perf_queries_exist_test.py | 26 +++++++++++++++++++ tests/parallel_test/cases.task | 1 + 2 files changed, 27 insertions(+) create mode 100644 tests/army/query/sys/tb_perf_queries_exist_test.py diff --git a/tests/army/query/sys/tb_perf_queries_exist_test.py b/tests/army/query/sys/tb_perf_queries_exist_test.py new file mode 100644 index 0000000000..e6afc0bec6 --- /dev/null +++ b/tests/army/query/sys/tb_perf_queries_exist_test.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- + +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame import * +from frame.autogen import * + +''' + TS-5349: https://jira.taosdata.com:18080/browse/TS-5349 + 查询 performance_schema.perf_queries 后, 再查询 information_schema.perf_queries, + 正常情况下在 information_schema 中不存在表 perf_queries +''' + +class TDTestCase(TBase): + + def run(self): + tdSql.query("select * from performance_schema.perf_queries;") + tdLog.info("Table [perf_queries] exist in schema [performance_schema]") + + tdSql.error("select * from information_schema.perf_queries;") + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 3c2adfb46f..92f6f32943 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -42,6 +42,7 @@ ,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_compare_asc_desc.py ,,y,army,./pytest.sh python3 ./test.py -f query/last/test_last.py ,,y,army,./pytest.sh python3 ./test.py -f query/window/base.py +,,y,army,./pytest.sh python3 ./test.py -f query/sys/tb_perf_queries_exist_test.py -N 3 # # system test From 9cf662f1642be01a76895ee67c48ef2a562b5e6b Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Fri, 13 Sep 2024 10:58:47 +0800 Subject: [PATCH 18/25] fix: disable dynamic adjustment of 'keepaliveidle' --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 5b67e1267b..0937a56b3c 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -613,7 +613,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, CFG_SCOPE_BOTH, CFG_DYN_NONE)); tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 72000); - TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_ENT_BOTH)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_NONE)); tsNumOfTaskQueueThreads = tsNumOfCores * 2; tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 16); From 987e086918c8a3cdfad79cdc378331a6f1e35b15 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 13 Sep 2024 12:32:25 +0800 Subject: [PATCH 19/25] enh(stmt2/gettbname): ignore tbname not set error --- source/client/src/clientStmt2.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 01a7c4be4c..ad8fc52b95 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -1888,7 +1888,12 @@ int stmtGetParamTbName(TAOS_STMT2* stmt, int* nums) { STMT_ERR_RET(stmtParseSql(pStmt)); } - *nums = STMT_TYPE_MULTI_INSERT == pStmt->sql.type ? 1 : 0; + if (TSDB_CODE_TSC_STMT_TBNAME_ERROR == pStmt->errCode) { + *nums = 1; + pStmt->errCode = TSDB_CODE_SUCCESS; + } else { + *nums = STMT_TYPE_MULTI_INSERT == pStmt->sql.type ? 1 : 0; + } return TSDB_CODE_SUCCESS; } From b69112bf416af52c762dbd647b2a8b7c5f429d21 Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 13 Sep 2024 02:20:07 +0000 Subject: [PATCH 20/25] fix/TS-5404-disable-mnode-sync --- source/dnode/mnode/impl/src/mndSync.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 2b32dc7781..e73cc1b5db 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -543,14 +543,14 @@ void mndSyncCheckTimeout(SMnode *pMnode) { if (delta > MNODE_TIMEOUT_SEC) { mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId, pMgmt->transSec, curSec, delta, pMgmt->transSeq); - pMgmt->transId = 0; - pMgmt->transSec = 0; - pMgmt->transSeq = 0; - terrno = TSDB_CODE_SYN_TIMEOUT; - pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT; - if (tsem_post(&pMgmt->syncSem) < 0) { - mError("failed to post sem"); - } + // pMgmt->transId = 0; + // pMgmt->transSec = 0; + // pMgmt->transSeq = 0; + // terrno = TSDB_CODE_SYN_TIMEOUT; + // pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT; + //if (tsem_post(&pMgmt->syncSem) < 0) { + // mError("failed to post sem"); + //} } else { mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId, pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq); From 8bf6b5f6ec37ba455aabf6a21a5ff255df40b21c Mon Sep 17 00:00:00 2001 From: wade zhang <95411902+gccgdb1234@users.noreply.github.com> Date: Fri, 13 Sep 2024 16:17:12 +0800 Subject: [PATCH 21/25] Update 22-meta.md --- docs/zh/14-reference/03-taos-sql/22-meta.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/zh/14-reference/03-taos-sql/22-meta.md b/docs/zh/14-reference/03-taos-sql/22-meta.md index 35b71feb2c..397b3d3362 100644 --- a/docs/zh/14-reference/03-taos-sql/22-meta.md +++ b/docs/zh/14-reference/03-taos-sql/22-meta.md @@ -93,8 +93,8 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | 4 | vgroups | INT | 数据库中有多少个 vgroup。需要注意,`vgroups` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 6 | replica | INT | 副本数。需要注意,`replica` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 7 | strict | VARCHAR(4) | 废弃参数 | -| 8 | duration | VARCHAR(10) | 单文件存储数据的时间跨度。需要注意,`duration` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | -| 9 | keep | VARCHAR(32) | 数据保留时长。需要注意,`keep` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | +| 8 | duration | VARCHAR(10) | 单文件存储数据的时间跨度。需要注意,`duration` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。内部存储单位为分钟,查询时有可能转换为天或小时展示 | +| 9 | keep | VARCHAR(32) | 数据保留时长。需要注意,`keep` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 内部存储单位为分钟,查询时有可能转换为天或小时展示 | | 10 | buffer | INT | 每个 vnode 写缓存的内存块大小,单位 MB。需要注意,`buffer` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 11 | pagesize | INT | 每个 VNODE 中元数据存储引擎的页大小,单位为 KB。需要注意,`pagesize` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 12 | pages | INT | 每个 vnode 元数据存储引擎的缓存页个数。需要注意,`pages` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | From b836235b67d3514bc6f06af2b4750edd1d042885 Mon Sep 17 00:00:00 2001 From: qevolg <2227465945@qq.com> Date: Fri, 13 Sep 2024 13:14:10 +0000 Subject: [PATCH 22/25] [TS-4893]: add CI test cases test_function --- tests/army/query/function/ans/rand.csv | 21 +++++ tests/army/query/function/in/rand.in | 3 + tests/army/query/function/test_function.py | 101 +++++++++++++++++---- 3 files changed, 108 insertions(+), 17 deletions(-) create mode 100644 tests/army/query/function/ans/rand.csv create mode 100644 tests/army/query/function/in/rand.in diff --git a/tests/army/query/function/ans/rand.csv b/tests/army/query/function/ans/rand.csv new file mode 100644 index 0000000000..40e20c5ba4 --- /dev/null +++ b/tests/army/query/function/ans/rand.csv @@ -0,0 +1,21 @@ +0.663936012733698 +0.840187717154710 +0.840187717154710 +0.700976369297587 +0.561380175203728 +0.916457875592847 +0.274745596235034 +0.135438768721856 +0.486904139391568 +0.352760728612896 +0.206965447965528 +0.419929514834624 +0.419929514834624 +0.419929514834624 +0.419929514834624 +0.419929514834624 +0.419929514834624 +0.419929514834624 +0.419929514834624 +0.419929514834624 +0.419929514834624 \ No newline at end of file diff --git a/tests/army/query/function/in/rand.in b/tests/army/query/function/in/rand.in new file mode 100644 index 0000000000..185a76d6f9 --- /dev/null +++ b/tests/army/query/function/in/rand.in @@ -0,0 +1,3 @@ +select RAND(1245); +select RAND(id) from ts_4893.d0 limit 10; +select RAND(id) from ts_4893.d0 order by id desc limit 10; \ No newline at end of file diff --git a/tests/army/query/function/test_function.py b/tests/army/query/function/test_function.py index 4981e93563..18a0d46711 100644 --- a/tests/army/query/function/test_function.py +++ b/tests/army/query/function/test_function.py @@ -509,31 +509,98 @@ class TDTestCase(TBase): tdSql.error( "select * from (select to_iso8601(ts, timezone()), timezone() from meters order by ts desc) limit 1000;", expectErrInfo="Not supported timzone format") # TS-5340 + + def test_rand(self): + self.test_normal_query("rand") + + tdSql.query("select rand();") + tdSql.checkRows(1) + tdSql.checkCols(1) + self.check_result_in_range(0, 0) + + tdSql.query("select rand(null);") + tdSql.checkRows(1) + tdSql.checkCols(1) + self.check_result_in_range(0, 0) + + tdSql.query("select rand() from (select 1) t limit 1;") + tdSql.checkRows(1) + tdSql.checkCols(1) + self.check_result_in_range(0, 0) + + tdSql.query("select rand(id) from ts_4893.d0 limit 100;") + tdSql.checkRows(100) + tdSql.checkCols(1) + for i in range(len(tdSql.res)): + self.check_result_in_range(i, 0) + + tdSql.query("select rand(id) from ts_4893.meters limit 100;") + tdSql.checkRows(100) + tdSql.checkCols(1) + for i in range(len(tdSql.res)): + self.check_result_in_range(i, 0) + + tdSql.query("select rand(123), rand(123);") + tdSql.checkRows(1) + tdSql.checkCols(2) + if tdSql.res[0][0] != tdSql.res[0][1]: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno, tdSql.sql, tdSql.res[0][0], tdSql.res[0][1]) + tdLog.exit("%s(%d) failed: sql:%s data1:%s ne data2:%s" % args) + + def check_result_in_range(self, row, col): + res = tdSql.res[row][col] + if res < 0 or res >= 1: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno, tdSql.sql, row, col, res) + tdLog.exit("%s(%d) failed: sql:%s row:%s col:%s data:%s lt 0 or ge 1" % args) + + def test_max(self): + self.test_normal_query("max") + + tdSql.query("select max(null) from ts_4893.meters;") + tdSql.checkRows(1) + tdSql.checkCols(1) + tdSql.checkData(0, 0, 'None') + + tdSql.query("select max(id) from ts_4893.meters;") + tdSql.checkRows(1) + + tdSql.query("select max(name) from ts_4893.meters;") + tdSql.checkRows(1) + + tdSql.query("select max(current) from ts_4893.meters;") + tdSql.checkRows(1) + + tdSql.query("select max(nch1) from ts_4893.meters;") + tdSql.checkRows(1) + + tdSql.query("select max(var1) from ts_4893.meters;") + tdSql.checkRows(1) + def test_min(self): self.test_normal_query("min") - tdSql.query("select min(var1), min(id) from ts_4893.d0;") + tdSql.query("select min(null) from ts_4893.meters;") tdSql.checkRows(1) - tdSql.checkData(0, 0, 'abc一二三abc一二三abc') - tdSql.checkData(0, 1, 0) - def test_max(self): - self.test_normal_query("max") - tdSql.query("select max(var1), max(id) from ts_4893.d0;") - tdSql.checkRows(1) - tdSql.checkData(0, 0, '一二三四五六七八九十') - tdSql.checkData(0, 1, 9999) - def test_rand(self): - tdSql.query("select rand();") + tdSql.checkCols(1) + tdSql.checkData(0, 0, 'None') + + tdSql.query("select min(id) from ts_4893.meters;") tdSql.checkRows(1) - tdSql.query("select rand(1);") + tdSql.query("select min(name) from ts_4893.meters;") tdSql.checkRows(1) - tdSql.query("select rand(1) from ts_4893.meters limit 10;") - tdSql.checkRows(10) + tdSql.query("select min(current) from ts_4893.meters;") + tdSql.checkRows(1) + + tdSql.query("select min(nch1) from ts_4893.meters;") + tdSql.checkRows(1) + + tdSql.query("select min(var1) from ts_4893.meters;") + tdSql.checkRows(1) - tdSql.query("select rand(id) from ts_4893.d0 limit 10;") - tdSql.checkRows(10) # run def run(self): tdLog.debug(f"start to excute {__file__}") @@ -576,8 +643,8 @@ class TDTestCase(TBase): self.test_varpop() # select function - self.test_min() self.test_max() + self.test_min() # error function self.test_error() From a13b385a16c639975d1447538f3e5e635012a46f Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 13 Sep 2024 16:49:03 +0800 Subject: [PATCH 23/25] docs: add description of rpcQueueMemoryAllowed/syncLogBufferMemoryAllowed --- docs/zh/14-reference/01-components/01-taosd.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/zh/14-reference/01-components/01-taosd.md b/docs/zh/14-reference/01-components/01-taosd.md index 004116b478..163e95d228 100644 --- a/docs/zh/14-reference/01-components/01-taosd.md +++ b/docs/zh/14-reference/01-components/01-taosd.md @@ -160,6 +160,12 @@ charset 的有效值是 UTF-8。 | :----------------: | :---------------------------------------------: | | numOfCommitThreads | 写入线程的最大数量,取值范围 0-1024,缺省值为 4 | +### 内存相关 +| 参数名称 | 参数说明 | +| :----------------: | :---------------------------------------------: | +| rpcQueueMemoryAllowed | 一个 dnode 允许的 rpc 消息占用的内存最大值,单位 bytes,取值范围:10485760-INT64_MAX,缺省值:服务器内存的 1/10 | +| syncLogBufferMemoryAllowed | 一个 dnode 允许的 sync 日志缓存消息占用的内存最大值,单位 bytes,取值范围:10485760-INT64_MAX,缺省值:服务器内存的 1/10, 3.1.3.2/3.3.2.13 版本开始生效 | + ### 日志相关 | 参数名称 | 参数说明 | From 719a5f4a8df7d2a46267e3647509126fdad20d23 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 13 Sep 2024 16:50:52 +0800 Subject: [PATCH 24/25] docs: add description of rpcQueueMemoryAllowed/syncLogBufferMemoryAllowed --- docs/zh/14-reference/01-components/01-taosd.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/14-reference/01-components/01-taosd.md b/docs/zh/14-reference/01-components/01-taosd.md index 163e95d228..a3e6d7693d 100644 --- a/docs/zh/14-reference/01-components/01-taosd.md +++ b/docs/zh/14-reference/01-components/01-taosd.md @@ -164,7 +164,7 @@ charset 的有效值是 UTF-8。 | 参数名称 | 参数说明 | | :----------------: | :---------------------------------------------: | | rpcQueueMemoryAllowed | 一个 dnode 允许的 rpc 消息占用的内存最大值,单位 bytes,取值范围:10485760-INT64_MAX,缺省值:服务器内存的 1/10 | -| syncLogBufferMemoryAllowed | 一个 dnode 允许的 sync 日志缓存消息占用的内存最大值,单位 bytes,取值范围:10485760-INT64_MAX,缺省值:服务器内存的 1/10, 3.1.3.2/3.3.2.13 版本开始生效 | +| syncLogBufferMemoryAllowed | 一个 dnode 允许的 sync 日志缓存消息占用的内存最大值,单位 bytes,取值范围:10485760-INT64_MAX,缺省值:服务器内存的 1/10,3.1.3.2/3.3.2.13 版本开始生效 | ### 日志相关 From 36a038529a35f2dc0bf9de5aa1f282cbdba8b359 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 13 Sep 2024 16:51:57 +0800 Subject: [PATCH 25/25] docs: add description of rpcQueueMemoryAllowed/syncLogBufferMemoryAllowed --- docs/zh/14-reference/01-components/01-taosd.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/zh/14-reference/01-components/01-taosd.md b/docs/zh/14-reference/01-components/01-taosd.md index a3e6d7693d..ff6b27092d 100644 --- a/docs/zh/14-reference/01-components/01-taosd.md +++ b/docs/zh/14-reference/01-components/01-taosd.md @@ -154,18 +154,18 @@ charset 的有效值是 UTF-8。 | :-----------: | :-------------------------------------------------------------------------: | | supportVnodes | dnode 支持的最大 vnode 数目,取值范围:0-4096,缺省值: CPU 核数的 2 倍 + 5 | -### 性能调优 - -| 参数名称 | 参数说明 | -| :----------------: | :---------------------------------------------: | -| numOfCommitThreads | 写入线程的最大数量,取值范围 0-1024,缺省值为 4 | - ### 内存相关 | 参数名称 | 参数说明 | | :----------------: | :---------------------------------------------: | | rpcQueueMemoryAllowed | 一个 dnode 允许的 rpc 消息占用的内存最大值,单位 bytes,取值范围:10485760-INT64_MAX,缺省值:服务器内存的 1/10 | | syncLogBufferMemoryAllowed | 一个 dnode 允许的 sync 日志缓存消息占用的内存最大值,单位 bytes,取值范围:10485760-INT64_MAX,缺省值:服务器内存的 1/10,3.1.3.2/3.3.2.13 版本开始生效 | +### 性能调优 + +| 参数名称 | 参数说明 | +| :----------------: | :---------------------------------------------: | +| numOfCommitThreads | 写入线程的最大数量,取值范围 0-1024,缺省值为 4 | + ### 日志相关 | 参数名称 | 参数说明 |