nElasticBlocks replace nBufBlocks

This commit is contained in:
AlexDuan 2021-08-19 21:02:43 +08:00
parent 79a18b2422
commit 0608e9086b
4 changed files with 21 additions and 16 deletions

View File

@ -29,6 +29,7 @@ typedef struct {
int tBufBlocks; int tBufBlocks;
int nBufBlocks; int nBufBlocks;
int nRecycleBlocks; int nRecycleBlocks;
int nElasticBlocks;
int64_t index; int64_t index;
SList* bufBlockList; SList* bufBlockList;
} STsdbBufPool; } STsdbBufPool;
@ -41,7 +42,7 @@ int tsdbOpenBufPool(STsdbRepo* pRepo);
void tsdbCloseBufPool(STsdbRepo* pRepo); void tsdbCloseBufPool(STsdbRepo* pRepo);
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks); int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks);
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode); void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic);
// health cite // health cite
STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize); STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize);

View File

@ -67,6 +67,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) {
pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB
pPool->tBufBlocks = pCfg->totalBlocks; pPool->tBufBlocks = pCfg->totalBlocks;
pPool->nBufBlocks = 0; pPool->nBufBlocks = 0;
pPool->nElasticBlocks = 0;
pPool->index = 0; pPool->index = 0;
pPool->nRecycleBlocks = 0; pPool->nRecycleBlocks = 0;
@ -199,10 +200,13 @@ err:
return err; return err;
} }
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) { void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic) {
STsdbBufBlock *pBufBlock = NULL; STsdbBufBlock *pBufBlock = NULL;
tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock)); tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock));
tsdbFreeBufBlock(pBufBlock); tsdbFreeBufBlock(pBufBlock);
free(pNode); free(pNode);
pPool->nBufBlocks--; if(bELastic)
pPool->nElasticBlocks--;
else
pPool->nBufBlocks--;
} }

View File

@ -32,16 +32,13 @@ int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) {
if(tsdbIdleMemEnough() && tsdbAllowNewBlock(pRepo)) { if(tsdbIdleMemEnough() && tsdbAllowNewBlock(pRepo)) {
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize); STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
if (pBufBlock) { if (pBufBlock) {
if (tsdbLockRepo(pRepo) >= 0) {
if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) { if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) {
// append error // append error
tsdbFreeBufBlock(pBufBlock); tsdbFreeBufBlock(pBufBlock);
} else { } else {
pPool->nRecycleBlocks ++; pPool->nElasticBlocks ++;
cnt ++ ; cnt ++ ;
} }
tsdbUnlockRepo(pRepo);
}
} }
} }
return cnt; return cnt;
@ -91,13 +88,11 @@ bool tsdbIdleMemEnough() {
bool tsdbAllowNewBlock(STsdbRepo* pRepo) { bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
//TODO config to taos.cfg //TODO config to taos.cfg
int32_t nElasticBlocks = 10; int32_t nMaxElastic = 3;
STsdbBufPool* pPool = pRepo->pPool; STsdbBufPool* pPool = pRepo->pPool;
int32_t nOverBlocks = pPool->nBufBlocks - pRepo->config.totalBlocks; if(pPool->nElasticBlocks >= nMaxElastic) {
if(nOverBlocks > nElasticBlocks) { tsdbWarn("tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", pPool->nElasticBlocks, nMaxElastic);
tsdbWarn("tsdbHealth allowNewBlock forbid. nOverBlocks(%d) > nElasticBlocks(%d)", nOverBlocks, nElasticBlocks);
return false; return false;
} }
return true; return true;
} }

View File

@ -99,17 +99,22 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
STsdbBufPool *pBufPool = pRepo->pPool; STsdbBufPool *pBufPool = pRepo->pPool;
SListNode *pNode = NULL; SListNode *pNode = NULL;
bool recycleBlocks = pBufPool->nRecycleBlocks > 0; bool addNew = false;
if (tsdbLockRepo(pRepo) < 0) return -1; if (tsdbLockRepo(pRepo) < 0) return -1;
while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) { while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) {
if (pBufPool->nRecycleBlocks > 0) { if (pBufPool->nRecycleBlocks > 0) {
tsdbRecycleBufferBlock(pBufPool, pNode); tsdbRecycleBufferBlock(pBufPool, pNode, false);
pBufPool->nRecycleBlocks -= 1; pBufPool->nRecycleBlocks -= 1;
} else { } else {
tdListAppendNode(pBufPool->bufBlockList, pNode); if(pBufPool->nElasticBlocks > 0 && listNEles(pBufPool->bufBlockList) > 0) {
tsdbRecycleBufferBlock(pBufPool, pNode, true);
} else {
tdListAppendNode(pBufPool->bufBlockList, pNode);
addNew = true;
}
} }
} }
if (!recycleBlocks) { if (addNew) {
int code = pthread_cond_signal(&pBufPool->poolNotEmpty); int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
if (code != 0) { if (code != 0) {
if (tsdbUnlockRepo(pRepo) < 0) return -1; if (tsdbUnlockRepo(pRepo) < 0) return -1;