diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 074ff20f22..4d62164df9 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -71,6 +71,11 @@ struct STsdbRepo { uint8_t state; STsdbCfg config; + + STsdbCfg save_config; // save apply config + bool config_changed; // config changed flag + pthread_mutex_t save_mutex; // protect save config + STsdbAppH appH; STsdbStat stat; STsdbMeta* tsdbMeta; diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 9e8e4acd7e..7a1b225849 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -112,6 +112,24 @@ int tsdbScheduleCommit(STsdbRepo *pRepo) { return 0; } +static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { + pRepo->config_changed = false; + STsdbCfg * pSaveCfg = &pRepo->save_config; + + pRepo->config.compression = pRepo->save_config.compression; + pRepo->config.keep = pRepo->save_config.keep; + pRepo->config.keep1 = pRepo->save_config.keep1; + pRepo->config.keep2 = pRepo->save_config.keep2; + pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow; + pRepo->config.update = pRepo->save_config.update; + + tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d), update(%d)", + REPO_ID(pRepo), + pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, + pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->update); + +} + static void *tsdbLoopCommit(void *arg) { SCommitQueue *pQueue = &tsCommitQueue; SListNode * pNode = NULL; @@ -138,6 +156,13 @@ static void *tsdbLoopCommit(void *arg) { pRepo = ((SCommitReq *)pNode->data)->pRepo; + // check if need to apply new config + if (pRepo->config_changed) { + pthread_mutex_lock(&pRepo->save_mutex); + tsdbApplyRepoConfig(pRepo); + pthread_mutex_unlock(&pRepo->save_mutex); + } + tsdbCommitData(pRepo); listNodeFree(pNode); } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 99929f3542..16804c7cf9 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -203,6 +203,70 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { // TODO: think about multithread cases + if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1; + + STsdbCfg * pRCfg = &repo->config; + + ASSERT(pRCfg->tsdbId == pCfg->tsdbId); + ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize); + ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile); + ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock); + ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock); + ASSERT(pRCfg->precision == pCfg->precision); + + bool configChanged = false; + if (pRCfg->compression != pCfg->compression) { + configChanged = true; + } + if (pRCfg->keep != pCfg->keep) { + configChanged = true; + } + if (pRCfg->keep1 != pCfg->keep1) { + configChanged = true; + } + if (pRCfg->keep2 != pCfg->keep2) { + configChanged = true; + } + if (pRCfg->cacheLastRow != pCfg->cacheLastRow) { + configChanged = true; + } + if (pRCfg->update != pCfg->update) { + configChanged = true; + } + + if (!configChanged) { + tsdbError("vgId:%d no config changed", REPO_ID(repo)); + } + + int code = pthread_mutex_lock(&repo->save_mutex); + if (code != 0) { + tsdbError("vgId:%d failed to lock tsdb save config mutex since %s", REPO_ID(repo), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + + STsdbCfg * pSaveCfg = &repo->save_config; + *pSaveCfg = repo->config; + + pSaveCfg->compression = pCfg->compression; + pSaveCfg->keep = pCfg->keep; + pSaveCfg->keep1 = pCfg->keep1; + pSaveCfg->keep2 = pCfg->keep2; + pSaveCfg->cacheLastRow = pCfg->cacheLastRow; + pSaveCfg->update = pCfg->update; + + tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d), update(%d)", + REPO_ID(repo), + pRCfg->compression, pRCfg->keep, pRCfg->keep1,pRCfg->keep2, + pRCfg->cacheLastRow, pRCfg->update); + tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d), update(%d)", + REPO_ID(repo), + pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, + pSaveCfg->cacheLastRow, pSaveCfg->update); + + repo->config_changed = true; + + pthread_mutex_unlock(&repo->save_mutex); return 0; #if 0 STsdbRepo *pRepo = (STsdbRepo *)repo; @@ -474,6 +538,14 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { return NULL; } + code = pthread_mutex_init(&(pRepo->save_mutex), NULL); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + tsdbFreeRepo(pRepo); + return NULL; + } + pRepo->config_changed = false; + code = tsem_init(&(pRepo->readyToCommit), 0, 1); if (code != 0) { code = errno;