diff --git a/src/tsdb/inc/tsdbBuffer.h b/src/tsdb/inc/tsdbBuffer.h index 17919c284e..4b650d3993 100644 --- a/src/tsdb/inc/tsdbBuffer.h +++ b/src/tsdb/inc/tsdbBuffer.h @@ -29,6 +29,7 @@ typedef struct { int tBufBlocks; int nBufBlocks; int nRecycleBlocks; + int nElasticBlocks; int64_t index; SList* bufBlockList; } STsdbBufPool; @@ -41,7 +42,7 @@ int tsdbOpenBufPool(STsdbRepo* pRepo); void tsdbCloseBufPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks); -void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode); +void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic); // health cite STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize); diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index a8d800208c..06a98323ab 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -67,6 +67,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) { pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB pPool->tBufBlocks = pCfg->totalBlocks; pPool->nBufBlocks = 0; + pPool->nElasticBlocks = 0; pPool->index = 0; pPool->nRecycleBlocks = 0; @@ -199,10 +200,13 @@ err: return err; } -void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) { +void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic) { STsdbBufBlock *pBufBlock = NULL; tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock)); tsdbFreeBufBlock(pBufBlock); free(pNode); - pPool->nBufBlocks--; + if(bELastic) + pPool->nElasticBlocks--; + else + pPool->nBufBlocks--; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbHealth.c b/src/tsdb/src/tsdbHealth.c index e5e95e405f..32f6f4fdb8 100644 --- a/src/tsdb/src/tsdbHealth.c +++ b/src/tsdb/src/tsdbHealth.c @@ -32,16 +32,13 @@ int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) { if(tsdbIdleMemEnough() && tsdbAllowNewBlock(pRepo)) { STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize); if (pBufBlock) { - if (tsdbLockRepo(pRepo) >= 0) { if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) { // append error tsdbFreeBufBlock(pBufBlock); } else { - pPool->nRecycleBlocks ++; + pPool->nElasticBlocks ++; cnt ++ ; } - tsdbUnlockRepo(pRepo); - } } } return cnt; @@ -91,13 +88,11 @@ bool tsdbIdleMemEnough() { bool tsdbAllowNewBlock(STsdbRepo* pRepo) { //TODO config to taos.cfg - int32_t nElasticBlocks = 10; + int32_t nMaxElastic = 3; STsdbBufPool* pPool = pRepo->pPool; - int32_t nOverBlocks = pPool->nBufBlocks - pRepo->config.totalBlocks; - if(nOverBlocks > nElasticBlocks) { - tsdbWarn("tsdbHealth allowNewBlock forbid. nOverBlocks(%d) > nElasticBlocks(%d)", nOverBlocks, nElasticBlocks); + if(pPool->nElasticBlocks >= nMaxElastic) { + tsdbWarn("tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", pPool->nElasticBlocks, nMaxElastic); return false; } - return true; } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index e766d97a97..02c9946704 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -99,17 +99,22 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { STsdbBufPool *pBufPool = pRepo->pPool; SListNode *pNode = NULL; - bool recycleBlocks = pBufPool->nRecycleBlocks > 0; + bool addNew = false; if (tsdbLockRepo(pRepo) < 0) return -1; while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) { if (pBufPool->nRecycleBlocks > 0) { - tsdbRecycleBufferBlock(pBufPool, pNode); + tsdbRecycleBufferBlock(pBufPool, pNode, false); pBufPool->nRecycleBlocks -= 1; } else { - tdListAppendNode(pBufPool->bufBlockList, pNode); + if(pBufPool->nElasticBlocks > 0 && listNEles(pBufPool->bufBlockList) > 0) { + tsdbRecycleBufferBlock(pBufPool, pNode, true); + } else { + tdListAppendNode(pBufPool->bufBlockList, pNode); + addNew = true; + } } } - if (!recycleBlocks) { + if (addNew) { int code = pthread_cond_signal(&pBufPool->poolNotEmpty); if (code != 0) { if (tsdbUnlockRepo(pRepo) < 0) return -1;