From 073d7b1f0841efbfd8c2098f0a25ad625cb3947f Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 29 Apr 2021 08:40:28 +0000 Subject: [PATCH 1/5] [TD-3963]tsdbRepo config hot change --- src/tsdb/inc/tsdbint.h | 5 +++ src/tsdb/src/tsdbCommitQueue.c | 25 ++++++++++++ src/tsdb/src/tsdbMain.c | 72 ++++++++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+) diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 074ff20f22..4d62164df9 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -71,6 +71,11 @@ struct STsdbRepo { uint8_t state; STsdbCfg config; + + STsdbCfg save_config; // save apply config + bool config_changed; // config changed flag + pthread_mutex_t save_mutex; // protect save config + STsdbAppH appH; STsdbStat stat; STsdbMeta* tsdbMeta; diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 9e8e4acd7e..7a1b225849 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -112,6 +112,24 @@ int tsdbScheduleCommit(STsdbRepo *pRepo) { return 0; } +static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { + pRepo->config_changed = false; + STsdbCfg * pSaveCfg = &pRepo->save_config; + + pRepo->config.compression = pRepo->save_config.compression; + pRepo->config.keep = pRepo->save_config.keep; + pRepo->config.keep1 = pRepo->save_config.keep1; + pRepo->config.keep2 = pRepo->save_config.keep2; + pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow; + pRepo->config.update = pRepo->save_config.update; + + tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d), update(%d)", + REPO_ID(pRepo), + pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, + pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->update); + +} + static void *tsdbLoopCommit(void *arg) { SCommitQueue *pQueue = &tsCommitQueue; SListNode * pNode = NULL; @@ -138,6 +156,13 @@ static void *tsdbLoopCommit(void *arg) { pRepo = ((SCommitReq *)pNode->data)->pRepo; + // check if need to apply new config + if (pRepo->config_changed) { + pthread_mutex_lock(&pRepo->save_mutex); + tsdbApplyRepoConfig(pRepo); + pthread_mutex_unlock(&pRepo->save_mutex); + } + tsdbCommitData(pRepo); listNodeFree(pNode); } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 99929f3542..16804c7cf9 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -203,6 +203,70 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { // TODO: think about multithread cases + if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1; + + STsdbCfg * pRCfg = &repo->config; + + ASSERT(pRCfg->tsdbId == pCfg->tsdbId); + ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize); + ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile); + ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock); + ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock); + ASSERT(pRCfg->precision == pCfg->precision); + + bool configChanged = false; + if (pRCfg->compression != pCfg->compression) { + configChanged = true; + } + if (pRCfg->keep != pCfg->keep) { + configChanged = true; + } + if (pRCfg->keep1 != pCfg->keep1) { + configChanged = true; + } + if (pRCfg->keep2 != pCfg->keep2) { + configChanged = true; + } + if (pRCfg->cacheLastRow != pCfg->cacheLastRow) { + configChanged = true; + } + if (pRCfg->update != pCfg->update) { + configChanged = true; + } + + if (!configChanged) { + tsdbError("vgId:%d no config changed", REPO_ID(repo)); + } + + int code = pthread_mutex_lock(&repo->save_mutex); + if (code != 0) { + tsdbError("vgId:%d failed to lock tsdb save config mutex since %s", REPO_ID(repo), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + + STsdbCfg * pSaveCfg = &repo->save_config; + *pSaveCfg = repo->config; + + pSaveCfg->compression = pCfg->compression; + pSaveCfg->keep = pCfg->keep; + pSaveCfg->keep1 = pCfg->keep1; + pSaveCfg->keep2 = pCfg->keep2; + pSaveCfg->cacheLastRow = pCfg->cacheLastRow; + pSaveCfg->update = pCfg->update; + + tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d), update(%d)", + REPO_ID(repo), + pRCfg->compression, pRCfg->keep, pRCfg->keep1,pRCfg->keep2, + pRCfg->cacheLastRow, pRCfg->update); + tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d), update(%d)", + REPO_ID(repo), + pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, + pSaveCfg->cacheLastRow, pSaveCfg->update); + + repo->config_changed = true; + + pthread_mutex_unlock(&repo->save_mutex); return 0; #if 0 STsdbRepo *pRepo = (STsdbRepo *)repo; @@ -474,6 +538,14 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { return NULL; } + code = pthread_mutex_init(&(pRepo->save_mutex), NULL); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + tsdbFreeRepo(pRepo); + return NULL; + } + pRepo->config_changed = false; + code = tsem_init(&(pRepo->readyToCommit), 0, 1); if (code != 0) { code = errno; From 968917f31264d380d4d2d6e6dd9f3607bf5333ab Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 6 May 2021 09:57:22 +0800 Subject: [PATCH 2/5] [TD-3963]remove change update config --- src/tsdb/src/tsdbCommitQueue.c | 6 ++--- src/tsdb/src/tsdbMain.c | 12 ++++------ src/vnode/src/vnodeMain.c | 40 ++++++++++++++++++---------------- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 7a1b225849..2e2cc74159 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -121,12 +121,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { pRepo->config.keep1 = pRepo->save_config.keep1; pRepo->config.keep2 = pRepo->save_config.keep2; pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow; - pRepo->config.update = pRepo->save_config.update; + //pRepo->config.update = pRepo->save_config.update; - tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d), update(%d)", + tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d)", REPO_ID(pRepo), pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, - pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->update); + pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow); } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 16804c7cf9..c5f1052b63 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -230,9 +230,6 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { if (pRCfg->cacheLastRow != pCfg->cacheLastRow) { configChanged = true; } - if (pRCfg->update != pCfg->update) { - configChanged = true; - } if (!configChanged) { tsdbError("vgId:%d no config changed", REPO_ID(repo)); @@ -253,16 +250,15 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { pSaveCfg->keep1 = pCfg->keep1; pSaveCfg->keep2 = pCfg->keep2; pSaveCfg->cacheLastRow = pCfg->cacheLastRow; - pSaveCfg->update = pCfg->update; - tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d), update(%d)", + tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d)", REPO_ID(repo), pRCfg->compression, pRCfg->keep, pRCfg->keep1,pRCfg->keep2, - pRCfg->cacheLastRow, pRCfg->update); - tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d), update(%d)", + pRCfg->cacheLastRow); + tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d)", REPO_ID(repo), pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, - pSaveCfg->cacheLastRow, pSaveCfg->update); + pSaveCfg->cacheLastRow); repo->config_changed = true; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 0921c5ce48..e471fb7e18 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -170,29 +170,31 @@ static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) { vDebug("vgId:%d, tsdbchanged:%d syncchanged:%d while alter vnode", pVnode->vgId, tsdbCfgChanged, syncCfgChanged); - if (/*tsdbCfgChanged || */syncCfgChanged) { + if (tsdbCfgChanged || syncCfgChanged) { // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS // dbCfgVersion can be corrected by status msg - if (!vnodeSetUpdatingStatus(pVnode)) { - vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId); - pVnode->dbCfgVersion = dbCfgVersion; - pVnode->vgCfgVersion = vgCfgVersion; - pVnode->syncCfg = syncCfg; - pVnode->tsdbCfg = tsdbCfg; - return TSDB_CODE_SUCCESS; + if (syncCfgChanged) { + if (!vnodeSetUpdatingStatus(pVnode)) { + vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId); + pVnode->dbCfgVersion = dbCfgVersion; + pVnode->vgCfgVersion = vgCfgVersion; + pVnode->syncCfg = syncCfg; + pVnode->tsdbCfg = tsdbCfg; + return TSDB_CODE_SUCCESS; + } + + code = syncReconfig(pVnode->sync, &pVnode->syncCfg); + if (code != TSDB_CODE_SUCCESS) { + pVnode->dbCfgVersion = dbCfgVersion; + pVnode->vgCfgVersion = vgCfgVersion; + pVnode->syncCfg = syncCfg; + pVnode->tsdbCfg = tsdbCfg; + vnodeSetReadyStatus(pVnode); + return code; + } } - code = syncReconfig(pVnode->sync, &pVnode->syncCfg); - if (code != TSDB_CODE_SUCCESS) { - pVnode->dbCfgVersion = dbCfgVersion; - pVnode->vgCfgVersion = vgCfgVersion; - pVnode->syncCfg = syncCfg; - pVnode->tsdbCfg = tsdbCfg; - vnodeSetReadyStatus(pVnode); - return code; - } - - if (pVnode->tsdb) { + if (tsdbCfgChanged && pVnode->tsdb) { code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg); if (code != TSDB_CODE_SUCCESS) { pVnode->dbCfgVersion = dbCfgVersion; From 344aff2087fdffef25179178177667aa7a46fc01 Mon Sep 17 00:00:00 2001 From: lichuang Date: Fri, 7 May 2021 10:36:51 +0800 Subject: [PATCH 3/5] [TD-3963]change totalBlocks config --- src/tsdb/inc/tsdbBuffer.h | 5 ++++- src/tsdb/src/tsdbBuffer.c | 41 +++++++++++++++++++++++++++++++++- src/tsdb/src/tsdbCommitQueue.c | 10 ++++++--- src/tsdb/src/tsdbMain.c | 12 ++++++---- src/tsdb/src/tsdbMemTable.c | 23 +++++++++++++------ 5 files changed, 75 insertions(+), 16 deletions(-) diff --git a/src/tsdb/inc/tsdbBuffer.h b/src/tsdb/inc/tsdbBuffer.h index 414ace0009..c6aabeaab9 100644 --- a/src/tsdb/inc/tsdbBuffer.h +++ b/src/tsdb/inc/tsdbBuffer.h @@ -28,8 +28,9 @@ typedef struct { int bufBlockSize; int tBufBlocks; int nBufBlocks; + int nRecycleBlocks; int64_t index; - SList* bufBlockList; + SList* bufBlockList; } STsdbBufPool; #define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold @@ -39,5 +40,7 @@ void tsdbFreeBufPool(STsdbBufPool* pBufPool); int tsdbOpenBufPool(STsdbRepo* pRepo); void tsdbCloseBufPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); +void tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks); +void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode); #endif /* _TD_TSDB_BUFFER_H_ */ \ No newline at end of file diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index 1798a21b99..266f2a45d8 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -70,6 +70,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) { pPool->tBufBlocks = pCfg->totalBlocks; pPool->nBufBlocks = 0; pPool->index = 0; + pPool->nRecycleBlocks = 0; for (int i = 0; i < pCfg->totalBlocks; i++) { STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize); @@ -156,4 +157,42 @@ _err: return NULL; } -static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } \ No newline at end of file +static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } + +void tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { + if (oldTotalBlocks == pRepo->config.totalBlocks) { + return; + } + + if (tsdbLockRepo(pRepo) < 0) return; + STsdbBufPool* pPool = pRepo->pPool; + + if (pRepo->config.totalBlocks > oldTotalBlocks) { + for (int i = 0; i < pRepo->config.totalBlocks - oldTotalBlocks; i++) { + STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize); + if (pBufBlock == NULL) goto err; + + if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) { + tsdbFreeBufBlock(pBufBlock); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto err; + } + + pPool->nBufBlocks++; + } + pthread_cond_signal(&pPool->poolNotEmpty); + } else { + pPool->nRecycleBlocks = oldTotalBlocks - pRepo->config.totalBlocks; + } + +err: + tsdbUnlockRepo(pRepo); +} + +void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) { + STsdbBufBlock *pBufBlock = NULL; + tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock)); + tsdbFreeBufBlock(pBufBlock); + free(pNode); + pPool->nBufBlocks--; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 2e2cc74159..86712db957 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -116,17 +116,21 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { pRepo->config_changed = false; STsdbCfg * pSaveCfg = &pRepo->save_config; + int32_t oldTotalBlocks = pRepo->config.totalBlocks; + pRepo->config.compression = pRepo->save_config.compression; pRepo->config.keep = pRepo->save_config.keep; pRepo->config.keep1 = pRepo->save_config.keep1; pRepo->config.keep2 = pRepo->save_config.keep2; pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow; - //pRepo->config.update = pRepo->save_config.update; + pRepo->config.totalBlocks = pRepo->save_config.totalBlocks; - tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d)", + tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d),totalBlocks(%d)", REPO_ID(pRepo), pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, - pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow); + pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->totalBlocks); + + tsdbExpendPool(pRepo, oldTotalBlocks); } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index c5f1052b63..fd02a3c8b9 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -230,6 +230,9 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { if (pRCfg->cacheLastRow != pCfg->cacheLastRow) { configChanged = true; } + if (pRCfg->totalBlocks != pCfg->totalBlocks) { + configChanged = true; + } if (!configChanged) { tsdbError("vgId:%d no config changed", REPO_ID(repo)); @@ -250,15 +253,16 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { pSaveCfg->keep1 = pCfg->keep1; pSaveCfg->keep2 = pCfg->keep2; pSaveCfg->cacheLastRow = pCfg->cacheLastRow; + pSaveCfg->totalBlocks = pCfg->totalBlocks; - tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d)", + tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)", REPO_ID(repo), pRCfg->compression, pRCfg->keep, pRCfg->keep1,pRCfg->keep2, - pRCfg->cacheLastRow); - tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d)", + pRCfg->cacheLastRow, pRCfg->totalBlocks); + tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)", REPO_ID(repo), pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, - pSaveCfg->cacheLastRow); + pSaveCfg->cacheLastRow,pSaveCfg->totalBlocks); repo->config_changed = true; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 20ec426018..776cc07d2f 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -98,17 +98,26 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { STsdbBufPool *pBufPool = pRepo->pPool; SListNode *pNode = NULL; + bool recycleBlocks = pBufPool->nRecycleBlocks > 0; if (tsdbLockRepo(pRepo) < 0) return -1; while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) { - tdListAppendNode(pBufPool->bufBlockList, pNode); + if (pBufPool->nRecycleBlocks > 0) { + tsdbRecycleBufferBlock(pBufPool, pNode); + pBufPool->nRecycleBlocks -= 1; + } else { + tdListAppendNode(pBufPool->bufBlockList, pNode); + } } - int code = pthread_cond_signal(&pBufPool->poolNotEmpty); - if (code != 0) { - if (tsdbUnlockRepo(pRepo) < 0) return -1; - tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code)); - terrno = TAOS_SYSTEM_ERROR(code); - return -1; + if (!recycleBlocks) { + int code = pthread_cond_signal(&pBufPool->poolNotEmpty); + if (code != 0) { + if (tsdbUnlockRepo(pRepo) < 0) return -1; + tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code)); + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } } + if (tsdbUnlockRepo(pRepo) < 0) return -1; for (int i = 0; i < pMemTable->maxTables; i++) { From 8e7381d70e56afcb17cb161bf34040e68890d742 Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 10 May 2021 10:14:44 +0800 Subject: [PATCH 4/5] free lastRow while cacheLastRow config reset --- src/tsdb/src/tsdbMemTable.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 776cc07d2f..c6fcf55686 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -967,6 +967,15 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) { STsdbCfg *pCfg = &pRepo->config; + // if cacheLastRow config has been reset, free the lastRow + if (!pCfg->cacheLastRow && pTable->lastRow != NULL) { + taosTZfree(pTable->lastRow); + TSDB_WLOCK_TABLE(pTable); + pTable->lastRow = NULL; + pTable->lastKey = TSKEY_INITIAL_VAL; + TSDB_WUNLOCK_TABLE(pTable); + } + if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) { if (pCfg->cacheLastRow || pTable->lastRow != NULL) { SDataRow nrow = pTable->lastRow; From d9127c05017c022f474a6174fa4950f1674ac6e6 Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 12 May 2021 14:41:33 +0800 Subject: [PATCH 5/5] [TD-3963]handle oom in tsdbExpendPool --- src/tsdb/inc/tsdbBuffer.h | 2 +- src/tsdb/src/tsdbBuffer.c | 10 +++++++--- src/tsdb/src/tsdbCommitQueue.c | 6 +++++- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/tsdb/inc/tsdbBuffer.h b/src/tsdb/inc/tsdbBuffer.h index c6aabeaab9..4e18ac711a 100644 --- a/src/tsdb/inc/tsdbBuffer.h +++ b/src/tsdb/inc/tsdbBuffer.h @@ -40,7 +40,7 @@ void tsdbFreeBufPool(STsdbBufPool* pBufPool); int tsdbOpenBufPool(STsdbRepo* pRepo); void tsdbCloseBufPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); -void tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks); +int tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks); void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode); #endif /* _TD_TSDB_BUFFER_H_ */ \ No newline at end of file diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index 266f2a45d8..429ea8e0ce 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -159,12 +159,14 @@ _err: static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } -void tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { +int tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { if (oldTotalBlocks == pRepo->config.totalBlocks) { - return; + return TSDB_CODE_SUCCESS; } - if (tsdbLockRepo(pRepo) < 0) return; + int err = TSDB_CODE_SUCCESS; + + if (tsdbLockRepo(pRepo) < 0) return terrno; STsdbBufPool* pPool = pRepo->pPool; if (pRepo->config.totalBlocks > oldTotalBlocks) { @@ -175,6 +177,7 @@ void tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) { tsdbFreeBufBlock(pBufBlock); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + err = TSDB_CODE_TDB_OUT_OF_MEMORY; goto err; } @@ -187,6 +190,7 @@ void tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { err: tsdbUnlockRepo(pRepo); + return err; } void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) { diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 86712db957..e753a3211e 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -130,7 +130,11 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->totalBlocks); - tsdbExpendPool(pRepo, oldTotalBlocks); + int err = tsdbExpendPool(pRepo, oldTotalBlocks); + if (!TAOS_SUCCEEDED(err)) { + tsdbError("vgId:%d expand pool from %d to %d fail,reason:%s", + REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err)); + } }