Merge pull request #15603 from taosdata/fix/TD-17592
fix: new alter keep[012], walLevel, walFsyncPeriod, cacheLast
This commit is contained in:
commit
a8176b6493
|
@ -135,6 +135,7 @@ int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* p
|
||||||
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||||
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||||
void* pMemRef);
|
void* pMemRef);
|
||||||
|
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
|
||||||
|
|
||||||
// tq
|
// tq
|
||||||
int tqInit();
|
int tqInit();
|
||||||
|
|
|
@ -15,11 +15,8 @@
|
||||||
|
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
static int tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg);
|
int32_t tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) {
|
||||||
|
STsdbKeepCfg *pKeepCfg = &pTsdb->keepCfg;
|
||||||
// implementation
|
|
||||||
|
|
||||||
static int tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg) {
|
|
||||||
pKeepCfg->precision = pCfg->precision;
|
pKeepCfg->precision = pCfg->precision;
|
||||||
pKeepCfg->days = pCfg->days;
|
pKeepCfg->days = pCfg->days;
|
||||||
pKeepCfg->keep0 = pCfg->keep0;
|
pKeepCfg->keep0 = pCfg->keep0;
|
||||||
|
@ -56,7 +53,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
|
||||||
pTsdb->pVnode = pVnode;
|
pTsdb->pVnode = pVnode;
|
||||||
taosThreadRwlockInit(&pTsdb->rwLock, NULL);
|
taosThreadRwlockInit(&pTsdb->rwLock, NULL);
|
||||||
if (!pKeepCfg) {
|
if (!pKeepCfg) {
|
||||||
tsdbSetKeepCfg(&pTsdb->keepCfg, &pVnode->config.tsdbCfg);
|
tsdbSetKeepCfg(pTsdb, &pVnode->config.tsdbCfg);
|
||||||
} else {
|
} else {
|
||||||
memcpy(&pTsdb->keepCfg, pKeepCfg, sizeof(STsdbKeepCfg));
|
memcpy(&pTsdb->keepCfg, pKeepCfg, sizeof(STsdbKeepCfg));
|
||||||
}
|
}
|
||||||
|
|
|
@ -297,8 +297,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
vTrace("message in fetch queue is processing");
|
vTrace("message in fetch queue is processing");
|
||||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META ||
|
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||||
pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) &&
|
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||||
!vnodeIsLeader(pVnode)) {
|
!vnodeIsLeader(pVnode)) {
|
||||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -977,6 +977,9 @@ static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, vo
|
||||||
|
|
||||||
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
SAlterVnodeReq alterReq = {0};
|
SAlterVnodeReq alterReq = {0};
|
||||||
|
bool walChanged = false;
|
||||||
|
bool tsdbChanged = false;
|
||||||
|
|
||||||
if (tDeserializeSAlterVnodeReq(pReq, len, &alterReq) != 0) {
|
if (tDeserializeSAlterVnodeReq(pReq, len, &alterReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return TSDB_CODE_INVALID_MSG;
|
return TSDB_CODE_INVALID_MSG;
|
||||||
|
@ -986,9 +989,54 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
|
||||||
alterReq.cacheLastSize);
|
alterReq.cacheLastSize);
|
||||||
if (pVnode->config.cacheLastSize != alterReq.cacheLastSize) {
|
if (pVnode->config.cacheLastSize != alterReq.cacheLastSize) {
|
||||||
pVnode->config.cacheLastSize = alterReq.cacheLastSize;
|
pVnode->config.cacheLastSize = alterReq.cacheLastSize;
|
||||||
// TODO: save config
|
|
||||||
tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
|
tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pVnode->config.cacheLast != alterReq.cacheLast) {
|
||||||
|
pVnode->config.cacheLast = alterReq.cacheLast;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pVnode->config.walCfg.fsyncPeriod != alterReq.walFsyncPeriod) {
|
||||||
|
pVnode->config.walCfg.fsyncPeriod = alterReq.walFsyncPeriod;
|
||||||
|
|
||||||
|
walChanged = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pVnode->config.walCfg.level != alterReq.walLevel) {
|
||||||
|
pVnode->config.walCfg.level = alterReq.walLevel;
|
||||||
|
|
||||||
|
walChanged = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pVnode->config.tsdbCfg.keep0 != alterReq.daysToKeep0) {
|
||||||
|
pVnode->config.tsdbCfg.keep0 = alterReq.daysToKeep0;
|
||||||
|
if (!VND_IS_RSMA(pVnode)) {
|
||||||
|
tsdbChanged = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pVnode->config.tsdbCfg.keep1 != alterReq.daysToKeep1) {
|
||||||
|
pVnode->config.tsdbCfg.keep1 = alterReq.daysToKeep1;
|
||||||
|
if (!VND_IS_RSMA(pVnode)) {
|
||||||
|
tsdbChanged = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pVnode->config.tsdbCfg.keep2 != alterReq.daysToKeep2) {
|
||||||
|
pVnode->config.tsdbCfg.keep2 = alterReq.daysToKeep2;
|
||||||
|
if (!VND_IS_RSMA(pVnode)) {
|
||||||
|
tsdbChanged = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (walChanged) {
|
||||||
|
walAlter(pVnode->pWal, &pVnode->config.walCfg);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsdbChanged) {
|
||||||
|
tsdbSetKeepCfg(pVnode->pTsdb, &pVnode->config.tsdbCfg);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue