[TD-3963]change totalBlocks config
This commit is contained in:
parent
968917f312
commit
344aff2087
|
@ -28,8 +28,9 @@ typedef struct {
|
|||
int bufBlockSize;
|
||||
int tBufBlocks;
|
||||
int nBufBlocks;
|
||||
int nRecycleBlocks;
|
||||
int64_t index;
|
||||
SList* bufBlockList;
|
||||
SList* bufBlockList;
|
||||
} STsdbBufPool;
|
||||
|
||||
#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold
|
||||
|
@ -39,5 +40,7 @@ void tsdbFreeBufPool(STsdbBufPool* pBufPool);
|
|||
int tsdbOpenBufPool(STsdbRepo* pRepo);
|
||||
void tsdbCloseBufPool(STsdbRepo* pRepo);
|
||||
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
|
||||
void tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks);
|
||||
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode);
|
||||
|
||||
#endif /* _TD_TSDB_BUFFER_H_ */
|
|
@ -70,6 +70,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) {
|
|||
pPool->tBufBlocks = pCfg->totalBlocks;
|
||||
pPool->nBufBlocks = 0;
|
||||
pPool->index = 0;
|
||||
pPool->nRecycleBlocks = 0;
|
||||
|
||||
for (int i = 0; i < pCfg->totalBlocks; i++) {
|
||||
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
|
||||
|
@ -156,4 +157,42 @@ _err:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
|
||||
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
|
||||
|
||||
void tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) {
|
||||
if (oldTotalBlocks == pRepo->config.totalBlocks) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (tsdbLockRepo(pRepo) < 0) return;
|
||||
STsdbBufPool* pPool = pRepo->pPool;
|
||||
|
||||
if (pRepo->config.totalBlocks > oldTotalBlocks) {
|
||||
for (int i = 0; i < pRepo->config.totalBlocks - oldTotalBlocks; i++) {
|
||||
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
|
||||
if (pBufBlock == NULL) goto err;
|
||||
|
||||
if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) {
|
||||
tsdbFreeBufBlock(pBufBlock);
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto err;
|
||||
}
|
||||
|
||||
pPool->nBufBlocks++;
|
||||
}
|
||||
pthread_cond_signal(&pPool->poolNotEmpty);
|
||||
} else {
|
||||
pPool->nRecycleBlocks = oldTotalBlocks - pRepo->config.totalBlocks;
|
||||
}
|
||||
|
||||
err:
|
||||
tsdbUnlockRepo(pRepo);
|
||||
}
|
||||
|
||||
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) {
|
||||
STsdbBufBlock *pBufBlock = NULL;
|
||||
tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock));
|
||||
tsdbFreeBufBlock(pBufBlock);
|
||||
free(pNode);
|
||||
pPool->nBufBlocks--;
|
||||
}
|
|
@ -116,17 +116,21 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
|
|||
pRepo->config_changed = false;
|
||||
STsdbCfg * pSaveCfg = &pRepo->save_config;
|
||||
|
||||
int32_t oldTotalBlocks = pRepo->config.totalBlocks;
|
||||
|
||||
pRepo->config.compression = pRepo->save_config.compression;
|
||||
pRepo->config.keep = pRepo->save_config.keep;
|
||||
pRepo->config.keep1 = pRepo->save_config.keep1;
|
||||
pRepo->config.keep2 = pRepo->save_config.keep2;
|
||||
pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow;
|
||||
//pRepo->config.update = pRepo->save_config.update;
|
||||
pRepo->config.totalBlocks = pRepo->save_config.totalBlocks;
|
||||
|
||||
tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d)",
|
||||
tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d),totalBlocks(%d)",
|
||||
REPO_ID(pRepo),
|
||||
pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
|
||||
pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow);
|
||||
pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->totalBlocks);
|
||||
|
||||
tsdbExpendPool(pRepo, oldTotalBlocks);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -230,6 +230,9 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) {
|
|||
if (pRCfg->cacheLastRow != pCfg->cacheLastRow) {
|
||||
configChanged = true;
|
||||
}
|
||||
if (pRCfg->totalBlocks != pCfg->totalBlocks) {
|
||||
configChanged = true;
|
||||
}
|
||||
|
||||
if (!configChanged) {
|
||||
tsdbError("vgId:%d no config changed", REPO_ID(repo));
|
||||
|
@ -250,15 +253,16 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) {
|
|||
pSaveCfg->keep1 = pCfg->keep1;
|
||||
pSaveCfg->keep2 = pCfg->keep2;
|
||||
pSaveCfg->cacheLastRow = pCfg->cacheLastRow;
|
||||
pSaveCfg->totalBlocks = pCfg->totalBlocks;
|
||||
|
||||
tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d)",
|
||||
tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)",
|
||||
REPO_ID(repo),
|
||||
pRCfg->compression, pRCfg->keep, pRCfg->keep1,pRCfg->keep2,
|
||||
pRCfg->cacheLastRow);
|
||||
tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d)",
|
||||
pRCfg->cacheLastRow, pRCfg->totalBlocks);
|
||||
tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)",
|
||||
REPO_ID(repo),
|
||||
pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
|
||||
pSaveCfg->cacheLastRow);
|
||||
pSaveCfg->cacheLastRow,pSaveCfg->totalBlocks);
|
||||
|
||||
repo->config_changed = true;
|
||||
|
||||
|
|
|
@ -98,17 +98,26 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
|
|||
STsdbBufPool *pBufPool = pRepo->pPool;
|
||||
|
||||
SListNode *pNode = NULL;
|
||||
bool recycleBlocks = pBufPool->nRecycleBlocks > 0;
|
||||
if (tsdbLockRepo(pRepo) < 0) return -1;
|
||||
while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) {
|
||||
tdListAppendNode(pBufPool->bufBlockList, pNode);
|
||||
if (pBufPool->nRecycleBlocks > 0) {
|
||||
tsdbRecycleBufferBlock(pBufPool, pNode);
|
||||
pBufPool->nRecycleBlocks -= 1;
|
||||
} else {
|
||||
tdListAppendNode(pBufPool->bufBlockList, pNode);
|
||||
}
|
||||
}
|
||||
int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
|
||||
if (code != 0) {
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||
tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code));
|
||||
terrno = TAOS_SYSTEM_ERROR(code);
|
||||
return -1;
|
||||
if (!recycleBlocks) {
|
||||
int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
|
||||
if (code != 0) {
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||
tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code));
|
||||
terrno = TAOS_SYSTEM_ERROR(code);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||
|
||||
for (int i = 0; i < pMemTable->maxTables; i++) {
|
||||
|
|
Loading…
Reference in New Issue