From 951ff0071a6af4d10d56affaadd6668347073a52 Mon Sep 17 00:00:00 2001 From: AlexDuan <417921451@qq.com> Date: Tue, 24 Aug 2021 11:36:35 +0800 Subject: [PATCH] add switch and elastic block equal totalBlocks 1/3 --- src/common/inc/tglobal.h | 2 ++ src/common/src/tglobal.c | 13 +++++++++++++ src/tsdb/src/tsdbBuffer.c | 29 ++++++++++------------------- src/tsdb/src/tsdbHealth.c | 34 ++-------------------------------- 4 files changed, 27 insertions(+), 51 deletions(-) diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 4b8347ead0..e53c898718 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -221,6 +221,8 @@ extern uint32_t maxRange; extern uint32_t curRange; extern char Compressor[]; #endif +// long query +extern int8_t tsDeathLockKillQuery; typedef struct { char dir[TSDB_FILENAME_LEN]; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index f169b07bb2..c7725dde08 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -276,6 +276,9 @@ uint32_t curRange = 100; // range char Compressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR #endif +// long query death-lock +int8_t tsDeathLockKillQuery = 0; + int32_t (*monStartSystemFp)() = NULL; void (*monStopSystemFp)() = NULL; void (*monExecuteSQLFp)(char *sql) = NULL; @@ -1647,6 +1650,16 @@ static void doInitGlobalConfig(void) { taosInitConfigOption(cfg); #endif + // enable kill long query + cfg.option = "deathLockKillQuery"; + cfg.ptr = &tsDeathLockKillQuery; + cfg.valType = TAOS_CFG_VTYPE_INT8; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 1; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); } void taosInitGlobalCfg() { diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index 2ba41dca2a..ec385ef83e 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -123,22 +123,20 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { STsdbBufPool *pBufPool = pRepo->pPool; while (POOL_IS_EMPTY(pBufPool)) { - tsdbWarn("vgId:%d Pool empty,nBufBlocks=%d nElastic=%d nRecycle=%d", REPO_ID(pRepo), pBufPool->nBufBlocks, pBufPool->nElasticBlocks, pBufPool->nRecycleBlocks); - // supply new Block - if(tsdbInsertNewBlock(pRepo) > 0) { - tsdbWarn("vgId:%d Insert new block to solve.", REPO_ID(pRepo)); - break; - } else { - // no newBlock, kill query free - if(!tsdbUrgeQueryFree(pRepo)) { - tsdbWarn("vgId:%d Urge query free thread start failed.", REPO_ID(pRepo)); + if(tsDeathLockKillQuery) { + // supply new Block + if(tsdbInsertNewBlock(pRepo) > 0) { + tsdbWarn("vgId:%d Insert elastic new block to solve.", REPO_ID(pRepo)); + break; + } else { + // no newBlock, kill query free + if(!tsdbUrgeQueryFree(pRepo)) + tsdbWarn("vgId:%d Urge query free thread start failed.", REPO_ID(pRepo)); } } pRepo->repoLocked = false; - tsdbDebug("vgId:%d wait for new block...", REPO_ID(pRepo)); pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex)); - tsdbDebug("vgId:%d waited new block ok.", REPO_ID(pRepo)); pRepo->repoLocked = true; } @@ -160,7 +158,7 @@ STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) { STsdbBufBlock *pBufBlock = (STsdbBufBlock *)malloc(sizeof(*pBufBlock) + bufBlockSize); if (pBufBlock == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; + return NULL; } pBufBlock->blockId = 0; @@ -168,10 +166,6 @@ STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) { pBufBlock->remain = bufBlockSize; return pBufBlock; - -_err: - tsdbFreeBufBlock(pBufBlock); - return NULL; } void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } @@ -216,10 +210,7 @@ void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic tsdbFreeBufBlock(pBufBlock); free(pNode); if(bELastic) - { pPool->nElasticBlocks--; - printf(" elastic block reduce one ok. current blocks=%d \n", pPool->nElasticBlocks); - } else pPool->nBufBlocks--; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbHealth.c b/src/tsdb/src/tsdbHealth.c index dddf40d963..cc6bae02b0 100644 --- a/src/tsdb/src/tsdbHealth.c +++ b/src/tsdb/src/tsdbHealth.c @@ -31,7 +31,7 @@ int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) { STsdbBufPool *pPool = pRepo->pPool; int32_t cnt = 0; - if(tsdbIdleMemEnough() && tsdbAllowNewBlock(pRepo)) { + if(tsdbAllowNewBlock(pRepo)) { STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize); if (pBufBlock) { if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) { @@ -67,37 +67,9 @@ bool tsdbUrgeQueryFree(STsdbRepo * pRepo) { return hTimer != NULL; } -bool tsdbIdleMemEnough() { - // TODO config to taos.cfg - int32_t lowestRate = 5; // below 10% idle memory, return not enough memory - float memoryUsedMB = 0; - float memoryAvailMB; - - if (!taosGetSysMemory(&memoryUsedMB)) { - tsdbWarn("tsdbHealth get memory error, return false."); - return true; - } - - if(memoryUsedMB > tsTotalMemoryMB || tsTotalMemoryMB == 0) { - tsdbWarn("tsdbHealth used memory(%d MB) large total memory(%d MB), return false.", (int)memoryUsedMB, (int)tsTotalMemoryMB); - return true; - } - - memoryAvailMB = (float)tsTotalMemoryMB - memoryUsedMB; - int32_t rate = (int32_t)(memoryAvailMB/tsTotalMemoryMB * 100); - if(rate < lowestRate){ - tsdbWarn("tsdbHealth real rate :%d less than lowest rate:%d, so return false.", rate, lowestRate); - return false; - } - - return true; -} - bool tsdbAllowNewBlock(STsdbRepo* pRepo) { - //TODO config to taos.cfg - int32_t nMaxElastic = 1; + int32_t nMaxElastic = pRepo->config.totalBlocks/3; STsdbBufPool* pPool = pRepo->pPool; - printf("tsdbAllowNewBlock nElasticBlock(%d) MaxElasticBlocks(%d)\n", pPool->nElasticBlocks, nMaxElastic); if(pPool->nElasticBlocks >= nMaxElastic) { tsdbWarn("tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", pPool->nElasticBlocks, nMaxElastic); return false; @@ -106,8 +78,6 @@ bool tsdbAllowNewBlock(STsdbRepo* pRepo) { } bool tsdbNoProblem(STsdbRepo* pRepo) { - if(!tsdbIdleMemEnough()) - return false; if(listNEles(pRepo->pPool->bufBlockList) == 0) return false; return true;