Merge branch '3.0' into enh/TS-5111-3.0
This commit is contained in:
commit
ced55162ae
|
@ -154,6 +154,12 @@ charset 的有效值是 UTF-8。
|
|||
| :-----------: | :-------------------------------------------------------------------------: |
|
||||
| supportVnodes | dnode 支持的最大 vnode 数目,取值范围:0-4096,缺省值: CPU 核数的 2 倍 + 5 |
|
||||
|
||||
### 内存相关
|
||||
| 参数名称 | 参数说明 |
|
||||
| :----------------: | :---------------------------------------------: |
|
||||
| rpcQueueMemoryAllowed | 一个 dnode 允许的 rpc 消息占用的内存最大值,单位 bytes,取值范围:10485760-INT64_MAX,缺省值:服务器内存的 1/10 |
|
||||
| syncLogBufferMemoryAllowed | 一个 dnode 允许的 sync 日志缓存消息占用的内存最大值,单位 bytes,取值范围:10485760-INT64_MAX,缺省值:服务器内存的 1/10,3.1.3.2/3.3.2.13 版本开始生效 |
|
||||
|
||||
### 性能调优
|
||||
|
||||
| 参数名称 | 参数说明 |
|
||||
|
|
|
@ -93,8 +93,8 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
|
|||
| 4 | vgroups | INT | 数据库中有多少个 vgroup。需要注意,`vgroups` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
|
||||
| 6 | replica | INT | 副本数。需要注意,`replica` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
|
||||
| 7 | strict | VARCHAR(4) | 废弃参数 |
|
||||
| 8 | duration | VARCHAR(10) | 单文件存储数据的时间跨度。需要注意,`duration` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
|
||||
| 9 | keep | VARCHAR(32) | 数据保留时长。需要注意,`keep` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
|
||||
| 8 | duration | VARCHAR(10) | 单文件存储数据的时间跨度。需要注意,`duration` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。内部存储单位为分钟,查询时有可能转换为天或小时展示 |
|
||||
| 9 | keep | VARCHAR(32) | 数据保留时长。需要注意,`keep` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 内部存储单位为分钟,查询时有可能转换为天或小时展示 |
|
||||
| 10 | buffer | INT | 每个 vnode 写缓存的内存块大小,单位 MB。需要注意,`buffer` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
|
||||
| 11 | pagesize | INT | 每个 VNODE 中元数据存储引擎的页大小,单位为 KB。需要注意,`pagesize` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
|
||||
| 12 | pages | INT | 每个 vnode 元数据存储引擎的缓存页个数。需要注意,`pages` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
|
||||
|
|
|
@ -94,6 +94,7 @@ extern int32_t tsElectInterval;
|
|||
extern int32_t tsHeartbeatInterval;
|
||||
extern int32_t tsHeartbeatTimeout;
|
||||
extern int32_t tsSnapReplMaxWaitN;
|
||||
extern int64_t tsLogBufferMemoryAllowed; // maximum allowed log buffer size in bytes for each dnode
|
||||
|
||||
// arbitrator
|
||||
extern int32_t tsArbHeartBeatIntervalSec;
|
||||
|
|
|
@ -966,6 +966,7 @@ int32_t taosGetErrSize();
|
|||
#define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013)
|
||||
#define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014)
|
||||
#define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015)
|
||||
#define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016)
|
||||
|
||||
// stream
|
||||
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
||||
|
|
|
@ -332,6 +332,7 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_MAX_LEARNER_REPLICA 10
|
||||
#define TSDB_SYNC_LOG_BUFFER_SIZE 4096
|
||||
#define TSDB_SYNC_LOG_BUFFER_RETENTION 256
|
||||
#define TSDB_SYNC_LOG_BUFFER_THRESHOLD (1024 * 1024 * 5)
|
||||
#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512
|
||||
#define TSDB_SYNC_NEGOTIATION_WIN 512
|
||||
|
||||
|
|
|
@ -77,7 +77,10 @@ bool chkRequestKilled(void* param) {
|
|||
return killed;
|
||||
}
|
||||
|
||||
void cleanupAppInfo() { taosHashCleanup(appInfo.pInstMap); }
|
||||
void cleanupAppInfo() {
|
||||
taosHashCleanup(appInfo.pInstMap);
|
||||
taosHashCleanup(appInfo.pInstMapByClusterId);
|
||||
}
|
||||
|
||||
static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
|
||||
SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj);
|
||||
|
|
|
@ -73,6 +73,8 @@ void taos_cleanup(void) {
|
|||
tscWarn("failed to cleanup task queue");
|
||||
}
|
||||
|
||||
tmqMgmtClose();
|
||||
|
||||
int32_t id = clientReqRefPool;
|
||||
clientReqRefPool = -1;
|
||||
taosCloseRef(id);
|
||||
|
@ -87,9 +89,6 @@ void taos_cleanup(void) {
|
|||
tscDebug("rpc cleanup");
|
||||
|
||||
taosConvDestroy();
|
||||
|
||||
tmqMgmtClose();
|
||||
|
||||
DestroyRegexCache();
|
||||
|
||||
tscInfo("all local resources released");
|
||||
|
|
|
@ -1889,7 +1889,12 @@ int stmtGetParamTbName(TAOS_STMT2* stmt, int* nums) {
|
|||
STMT_ERR_RET(stmtParseSql(pStmt));
|
||||
}
|
||||
|
||||
*nums = STMT_TYPE_MULTI_INSERT == pStmt->sql.type ? 1 : 0;
|
||||
if (TSDB_CODE_TSC_STMT_TBNAME_ERROR == pStmt->errCode) {
|
||||
*nums = 1;
|
||||
pStmt->errCode = TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
*nums = STMT_TYPE_MULTI_INSERT == pStmt->sql.type ? 1 : 0;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -2906,7 +2906,10 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
|
||||
taosFreeQitem(pWrapper);
|
||||
} else {
|
||||
(void)taosWriteQitem(tmq->mqueue, pWrapper);
|
||||
if (taosWriteQitem(tmq->mqueue, pWrapper) != 0){
|
||||
tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
|
||||
taosFreeQitem(pWrapper);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -81,6 +81,7 @@ int32_t tsElectInterval = 25 * 1000;
|
|||
int32_t tsHeartbeatInterval = 1000;
|
||||
int32_t tsHeartbeatTimeout = 20 * 1000;
|
||||
int32_t tsSnapReplMaxWaitN = 128;
|
||||
int64_t tsLogBufferMemoryAllowed = 0; // bytes
|
||||
|
||||
// mnode
|
||||
int64_t tsMndSdbWriteDelta = 200;
|
||||
|
@ -613,7 +614,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, CFG_SCOPE_BOTH, CFG_DYN_NONE));
|
||||
|
||||
tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 72000);
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_ENT_BOTH));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_NONE));
|
||||
|
||||
tsNumOfTaskQueueThreads = tsNumOfCores * 2;
|
||||
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 16);
|
||||
|
@ -702,6 +703,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
||||
tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
|
||||
|
||||
tsLogBufferMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
||||
tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
|
||||
|
||||
// clang-format off
|
||||
TAOS_CHECK_RETURN(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
|
@ -736,6 +740,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "syncLogBufferMemoryAllowed", tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER));
|
||||
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbHeartBeatIntervalSec", tsArbHeartBeatIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbCheckSyncIntervalSec", tsArbCheckSyncIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
|
@ -970,6 +975,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
|
|||
pItem->stype = stype;
|
||||
}
|
||||
|
||||
pItem = cfgGetItem(tsCfg, "syncLogBufferMemoryAllowed");
|
||||
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
|
||||
tsLogBufferMemoryAllowed = totalMemoryKB * 1024 * 0.1;
|
||||
tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
|
||||
pItem->i64 = tsLogBufferMemoryAllowed;
|
||||
pItem->stype = stype;
|
||||
}
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
|
@ -1520,6 +1533,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncSnapReplMaxWaitN");
|
||||
tsSnapReplMaxWaitN = pItem->i32;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncLogBufferMemoryAllowed");
|
||||
tsLogBufferMemoryAllowed = pItem->i64;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbHeartBeatIntervalSec");
|
||||
tsArbHeartBeatIntervalSec = pItem->i32;
|
||||
|
||||
|
@ -1954,6 +1970,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
|
|||
{"randErrorChance", &tsRandErrChance},
|
||||
{"randErrorDivisor", &tsRandErrDivisor},
|
||||
{"randErrorScope", &tsRandErrScope},
|
||||
{"syncLogBufferMemoryAllowed", &tsLogBufferMemoryAllowed},
|
||||
|
||||
{"cacheLazyLoadThreshold", &tsCacheLazyLoadThreshold},
|
||||
{"checkpointInterval", &tsStreamCheckpointInterval},
|
||||
|
|
|
@ -10346,7 +10346,11 @@ int32_t tDecodeSMCreateStbRsp(SDecoder *pDecoder, SMCreateStbRsp *pRsp) {
|
|||
}
|
||||
tEndDecode(pDecoder);
|
||||
|
||||
return code;
|
||||
|
||||
_exit:
|
||||
tFreeSTableMetaRsp(pRsp->pMeta);
|
||||
taosMemoryFreeClear(pRsp->pMeta);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -511,6 +511,11 @@ static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerOb
|
|||
}
|
||||
}
|
||||
}
|
||||
// no topics need to be rebalanced
|
||||
if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
|
||||
code = TSDB_CODE_TMQ_NO_NEED_REBALANCE;
|
||||
}
|
||||
|
||||
END:
|
||||
return code;
|
||||
}
|
||||
|
@ -581,6 +586,10 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
if(ubSubscribe){
|
||||
SMqConsumerObj *pConsumerTmp = NULL;
|
||||
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
|
||||
if (taosArrayGetSize(pConsumerTmp->assignedTopics) == 0){
|
||||
mndReleaseConsumer(pMnode, pConsumerTmp);
|
||||
goto END;
|
||||
}
|
||||
mndReleaseConsumer(pMnode, pConsumerTmp);
|
||||
}
|
||||
MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
|
||||
|
@ -599,7 +608,7 @@ END:
|
|||
mndTransDrop(pTrans);
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
|
||||
return code;
|
||||
return (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code;
|
||||
}
|
||||
|
||||
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
||||
|
|
|
@ -543,14 +543,14 @@ void mndSyncCheckTimeout(SMnode *pMnode) {
|
|||
if (delta > MNODE_TIMEOUT_SEC) {
|
||||
mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
|
||||
pMgmt->transSec, curSec, delta, pMgmt->transSeq);
|
||||
pMgmt->transId = 0;
|
||||
pMgmt->transSec = 0;
|
||||
pMgmt->transSeq = 0;
|
||||
terrno = TSDB_CODE_SYN_TIMEOUT;
|
||||
pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
|
||||
if (tsem_post(&pMgmt->syncSem) < 0) {
|
||||
mError("failed to post sem");
|
||||
}
|
||||
// pMgmt->transId = 0;
|
||||
// pMgmt->transSec = 0;
|
||||
// pMgmt->transSeq = 0;
|
||||
// terrno = TSDB_CODE_SYN_TIMEOUT;
|
||||
// pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
|
||||
//if (tsem_post(&pMgmt->syncSem) < 0) {
|
||||
// mError("failed to post sem");
|
||||
//}
|
||||
} else {
|
||||
mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
|
||||
pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq);
|
||||
|
|
|
@ -625,7 +625,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
}
|
||||
double cost = (taosGetTimestampUs() - st) / 1000.0;
|
||||
if (cost > tsCacheLazyLoadThreshold) {
|
||||
pr->lastTs = totalLastTs;
|
||||
// pr->lastTs = totalLastTs;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -667,8 +667,8 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
val = *pVal;
|
||||
} else {
|
||||
pCache->cacheHit += 1;
|
||||
STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
|
||||
val = *pVal;
|
||||
STableCachedVal* pValTmp = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
|
||||
val = *pValTmp;
|
||||
|
||||
bool bRes = taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
|
||||
qTrace("release LRU cache, res %d", bRes);
|
||||
|
@ -720,12 +720,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
|
||||
taosMemoryFree(data);
|
||||
}
|
||||
if (code) {
|
||||
if (freeReader) {
|
||||
pHandle->api.metaReaderFn.clearReader(&mr);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
} else { // todo opt for json tag
|
||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
code = colDataSetVal(pColInfoData, i, data, false);
|
||||
|
|
|
@ -21,7 +21,7 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
#include "sync.h"
|
||||
#include "taosdef.h"
|
||||
#include "tglobal.h"
|
||||
#include "trpc.h"
|
||||
#include "ttimer.h"
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ typedef struct SSyncLogBuffer {
|
|||
int64_t matchIndex;
|
||||
int64_t endIndex;
|
||||
int64_t size;
|
||||
int64_t bytes;
|
||||
TdThreadMutex mutex;
|
||||
TdThreadMutexAttr attr;
|
||||
int64_t totalIndex;
|
||||
|
|
|
@ -28,6 +28,8 @@
|
|||
#include "syncUtil.h"
|
||||
#include "syncVoteMgr.h"
|
||||
|
||||
static int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer
|
||||
|
||||
static bool syncIsMsgBlock(tmsg_t type) {
|
||||
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
||||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
|
||||
|
@ -101,6 +103,10 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
|
||||
pBuf->entries[index % pBuf->size] = tmp;
|
||||
pBuf->endIndex = index + 1;
|
||||
if (pNode->vgId > 1) {
|
||||
pBuf->bytes += pEntry->bytes;
|
||||
(void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
|
||||
}
|
||||
|
||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
|
@ -260,6 +266,10 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
|||
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
|
||||
pBuf->entries[index % pBuf->size] = tmp;
|
||||
taken = true;
|
||||
if (pNode->vgId > 1) {
|
||||
pBuf->bytes += pEntry->bytes;
|
||||
(void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
|
||||
}
|
||||
}
|
||||
|
||||
if (index < toIndex) {
|
||||
|
@ -286,6 +296,10 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
|||
}
|
||||
SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
|
||||
pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;
|
||||
if (pNode->vgId > 1) {
|
||||
pBuf->bytes += pDummy->bytes;
|
||||
(void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes);
|
||||
}
|
||||
|
||||
if (index < toIndex) {
|
||||
pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex;
|
||||
|
@ -330,6 +344,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
|||
(void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
||||
}
|
||||
pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
|
||||
pBuf->bytes = 0;
|
||||
int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode);
|
||||
if (code < 0) {
|
||||
sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code));
|
||||
|
@ -470,8 +485,12 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
goto _out;
|
||||
}
|
||||
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
|
||||
pEntry = NULL;
|
||||
pBuf->entries[index % pBuf->size] = tmp;
|
||||
if (pNode->vgId > 1) {
|
||||
pBuf->bytes += pEntry->bytes;
|
||||
(void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
|
||||
}
|
||||
pEntry = NULL;
|
||||
|
||||
// update end index
|
||||
pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);
|
||||
|
@ -846,14 +865,34 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
|||
}
|
||||
|
||||
// recycle
|
||||
bool isVnode = pNode->vgId > 1;
|
||||
SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION;
|
||||
for (SyncIndex index = pBuf->startIndex; index < until; index++) {
|
||||
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
|
||||
if (pEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
do {
|
||||
if ((pBuf->startIndex >= pBuf->commitIndex) ||
|
||||
!((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD &&
|
||||
atomic_load_64(&tsLogBufferMemoryUsed) >= tsLogBufferMemoryAllowed))) {
|
||||
break;
|
||||
}
|
||||
SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem;
|
||||
if (pEntry == NULL) {
|
||||
sError("vgId:%d, invalid log entry to recycle. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
|
||||
", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64,
|
||||
pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term);
|
||||
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
if (isVnode) {
|
||||
pBuf->bytes -= pEntry->bytes;
|
||||
(void)atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
|
||||
}
|
||||
sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
|
||||
", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64
|
||||
", used:%" PRId64 ", allowed:%" PRId64,
|
||||
pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term,
|
||||
pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed), tsLogBufferMemoryAllowed);
|
||||
syncEntryDestroy(pEntry);
|
||||
(void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
||||
pBuf->startIndex = index + 1;
|
||||
}
|
||||
(void)memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
||||
++pBuf->startIndex;
|
||||
} while (true);
|
||||
|
||||
code = 0;
|
||||
_out:
|
||||
|
@ -1324,6 +1363,7 @@ void syncLogBufferClear(SSyncLogBuffer* pBuf) {
|
|||
(void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
||||
}
|
||||
pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
|
||||
pBuf->bytes = 0;
|
||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||
}
|
||||
|
||||
|
|
|
@ -806,6 +806,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed valu
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance")
|
||||
|
||||
// stream
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
0.663936012733698
|
||||
0.840187717154710
|
||||
0.840187717154710
|
||||
0.700976369297587
|
||||
0.561380175203728
|
||||
0.916457875592847
|
||||
0.274745596235034
|
||||
0.135438768721856
|
||||
0.486904139391568
|
||||
0.352760728612896
|
||||
0.206965447965528
|
||||
0.419929514834624
|
||||
0.419929514834624
|
||||
0.419929514834624
|
||||
0.419929514834624
|
||||
0.419929514834624
|
||||
0.419929514834624
|
||||
0.419929514834624
|
||||
0.419929514834624
|
||||
0.419929514834624
|
||||
0.419929514834624
|
|
|
@ -0,0 +1,3 @@
|
|||
select RAND(1245);
|
||||
select RAND(id) from ts_4893.d0 limit 10;
|
||||
select RAND(id) from ts_4893.d0 order by id desc limit 10;
|
|
@ -509,31 +509,98 @@ class TDTestCase(TBase):
|
|||
tdSql.error(
|
||||
"select * from (select to_iso8601(ts, timezone()), timezone() from meters order by ts desc) limit 1000;",
|
||||
expectErrInfo="Not supported timzone format") # TS-5340
|
||||
|
||||
def test_rand(self):
|
||||
self.test_normal_query("rand")
|
||||
|
||||
tdSql.query("select rand();")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkCols(1)
|
||||
self.check_result_in_range(0, 0)
|
||||
|
||||
tdSql.query("select rand(null);")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkCols(1)
|
||||
self.check_result_in_range(0, 0)
|
||||
|
||||
tdSql.query("select rand() from (select 1) t limit 1;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkCols(1)
|
||||
self.check_result_in_range(0, 0)
|
||||
|
||||
tdSql.query("select rand(id) from ts_4893.d0 limit 100;")
|
||||
tdSql.checkRows(100)
|
||||
tdSql.checkCols(1)
|
||||
for i in range(len(tdSql.res)):
|
||||
self.check_result_in_range(i, 0)
|
||||
|
||||
tdSql.query("select rand(id) from ts_4893.meters limit 100;")
|
||||
tdSql.checkRows(100)
|
||||
tdSql.checkCols(1)
|
||||
for i in range(len(tdSql.res)):
|
||||
self.check_result_in_range(i, 0)
|
||||
|
||||
tdSql.query("select rand(123), rand(123);")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkCols(2)
|
||||
if tdSql.res[0][0] != tdSql.res[0][1]:
|
||||
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||
args = (caller.filename, caller.lineno, tdSql.sql, tdSql.res[0][0], tdSql.res[0][1])
|
||||
tdLog.exit("%s(%d) failed: sql:%s data1:%s ne data2:%s" % args)
|
||||
|
||||
def check_result_in_range(self, row, col):
|
||||
res = tdSql.res[row][col]
|
||||
if res < 0 or res >= 1:
|
||||
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||
args = (caller.filename, caller.lineno, tdSql.sql, row, col, res)
|
||||
tdLog.exit("%s(%d) failed: sql:%s row:%s col:%s data:%s lt 0 or ge 1" % args)
|
||||
|
||||
def test_max(self):
|
||||
self.test_normal_query("max")
|
||||
|
||||
tdSql.query("select max(null) from ts_4893.meters;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkCols(1)
|
||||
tdSql.checkData(0, 0, 'None')
|
||||
|
||||
tdSql.query("select max(id) from ts_4893.meters;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.query("select max(name) from ts_4893.meters;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.query("select max(current) from ts_4893.meters;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.query("select max(nch1) from ts_4893.meters;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.query("select max(var1) from ts_4893.meters;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
def test_min(self):
|
||||
self.test_normal_query("min")
|
||||
|
||||
tdSql.query("select min(var1), min(id) from ts_4893.d0;")
|
||||
tdSql.query("select min(null) from ts_4893.meters;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, 'abc一二三abc一二三abc')
|
||||
tdSql.checkData(0, 1, 0)
|
||||
def test_max(self):
|
||||
self.test_normal_query("max")
|
||||
tdSql.query("select max(var1), max(id) from ts_4893.d0;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '一二三四五六七八九十')
|
||||
tdSql.checkData(0, 1, 9999)
|
||||
def test_rand(self):
|
||||
tdSql.query("select rand();")
|
||||
tdSql.checkCols(1)
|
||||
tdSql.checkData(0, 0, 'None')
|
||||
|
||||
tdSql.query("select min(id) from ts_4893.meters;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.query("select rand(1);")
|
||||
tdSql.query("select min(name) from ts_4893.meters;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.query("select rand(1) from ts_4893.meters limit 10;")
|
||||
tdSql.checkRows(10)
|
||||
tdSql.query("select min(current) from ts_4893.meters;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.query("select min(nch1) from ts_4893.meters;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.query("select min(var1) from ts_4893.meters;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.query("select rand(id) from ts_4893.d0 limit 10;")
|
||||
tdSql.checkRows(10)
|
||||
# run
|
||||
def run(self):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
|
@ -576,8 +643,8 @@ class TDTestCase(TBase):
|
|||
self.test_varpop()
|
||||
|
||||
# select function
|
||||
self.test_min()
|
||||
self.test_max()
|
||||
self.test_min()
|
||||
|
||||
# error function
|
||||
self.test_error()
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
from frame.log import *
|
||||
from frame.cases import *
|
||||
from frame.sql import *
|
||||
from frame.caseBase import *
|
||||
from frame import *
|
||||
from frame.autogen import *
|
||||
|
||||
'''
|
||||
TS-5349: https://jira.taosdata.com:18080/browse/TS-5349
|
||||
查询 performance_schema.perf_queries 后, 再查询 information_schema.perf_queries,
|
||||
正常情况下在 information_schema 中不存在表 perf_queries
|
||||
'''
|
||||
|
||||
class TDTestCase(TBase):
|
||||
|
||||
def run(self):
|
||||
tdSql.query("select * from performance_schema.perf_queries;")
|
||||
tdLog.info("Table [perf_queries] exist in schema [performance_schema]")
|
||||
|
||||
tdSql.error("select * from information_schema.perf_queries;")
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -42,6 +42,7 @@
|
|||
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_compare_asc_desc.py
|
||||
,,y,army,./pytest.sh python3 ./test.py -f query/last/test_last.py
|
||||
,,y,army,./pytest.sh python3 ./test.py -f query/window/base.py
|
||||
,,y,army,./pytest.sh python3 ./test.py -f query/sys/tb_perf_queries_exist_test.py -N 3
|
||||
|
||||
#
|
||||
# system test
|
||||
|
|
Loading…
Reference in New Issue