Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize
This commit is contained in:
commit
2b7fa84e24
|
@ -1322,6 +1322,9 @@ typedef struct {
|
|||
// 1st modification
|
||||
int16_t sttTrigger;
|
||||
int32_t minRows;
|
||||
// 2nd modification
|
||||
int32_t walRetentionPeriod;
|
||||
int32_t walRetentionSize;
|
||||
} SAlterVnodeConfigReq;
|
||||
|
||||
int32_t tSerializeSAlterVnodeConfigReq(void* buf, int32_t bufLen, SAlterVnodeConfigReq* pReq);
|
||||
|
|
|
@ -2219,12 +2219,12 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
|
|||
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->replications) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->sstTrigger) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
|
||||
|
||||
// 1st modification
|
||||
if (tEncodeI32(&encoder, pReq->minRows) < 0) return -1;
|
||||
|
||||
// 2nd modification
|
||||
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -2252,13 +2252,6 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
|
|||
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->replications) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->sstTrigger) < 0) return -1;
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
if (tDecodeI32(&decoder, &pReq->walRetentionPeriod) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->walRetentionSize) < 0) return -1;
|
||||
} else {
|
||||
pReq->walRetentionPeriod = -1;
|
||||
pReq->walRetentionSize = -1;
|
||||
}
|
||||
|
||||
// 1st modification
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
|
@ -2266,6 +2259,15 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
|
|||
} else {
|
||||
pReq->minRows = -1;
|
||||
}
|
||||
|
||||
// 2nd modification
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
if (tDecodeI32(&decoder, &pReq->walRetentionPeriod) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->walRetentionSize) < 0) return -1;
|
||||
} else {
|
||||
pReq->walRetentionPeriod = -1;
|
||||
pReq->walRetentionSize = -1;
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -4196,7 +4198,9 @@ int32_t tSerializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeCon
|
|||
// 1st modification
|
||||
if (tEncodeI16(&encoder, pReq->sttTrigger) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->minRows) < 0) return -1;
|
||||
|
||||
// 2nd modification
|
||||
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -4235,6 +4239,14 @@ int32_t tDeserializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeC
|
|||
if (tDecodeI32(&decoder, &pReq->minRows) < 0) return -1;
|
||||
}
|
||||
|
||||
// 2n modification
|
||||
if (tDecodeIsEnd(&decoder)) {
|
||||
pReq->walRetentionPeriod = -1;
|
||||
pReq->walRetentionSize = -1;
|
||||
} else {
|
||||
if (tDecodeI32(&decoder, &pReq->walRetentionPeriod) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->walRetentionSize) < 0) return -1;
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
|
|
|
@ -275,6 +275,8 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
|
|||
pOld->cfg.daysToKeep2 = pNew->cfg.daysToKeep2;
|
||||
pOld->cfg.walFsyncPeriod = pNew->cfg.walFsyncPeriod;
|
||||
pOld->cfg.walLevel = pNew->cfg.walLevel;
|
||||
pOld->cfg.walRetentionPeriod = pNew->cfg.walRetentionPeriod;
|
||||
pOld->cfg.walRetentionSize = pNew->cfg.walRetentionSize;
|
||||
pOld->cfg.strict = pNew->cfg.strict;
|
||||
pOld->cfg.cacheLast = pNew->cfg.cacheLast;
|
||||
pOld->cfg.replications = pNew->cfg.replications;
|
||||
|
|
|
@ -325,6 +325,8 @@ static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pV
|
|||
alterReq.cacheLast = pDb->cfg.cacheLast;
|
||||
alterReq.sttTrigger = pDb->cfg.sstTrigger;
|
||||
alterReq.minRows = pDb->cfg.minRows;
|
||||
alterReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
|
||||
alterReq.walRetentionSize = pDb->cfg.walRetentionSize;
|
||||
|
||||
mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId);
|
||||
int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
|
||||
|
@ -2422,4 +2424,4 @@ int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
|
|||
STimeWindow tw) {
|
||||
if (mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw) != 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1477,10 +1477,11 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
|
|||
}
|
||||
|
||||
vInfo("vgId:%d, start to alter vnode config, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
|
||||
" cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d",
|
||||
" cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d walRetentionPeriod:%d "
|
||||
"walRetentionSize:%d",
|
||||
TD_VID(pVnode), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
|
||||
req.cacheLast, req.cacheLastSize, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
|
||||
req.walFsyncPeriod, req.walLevel);
|
||||
req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize);
|
||||
|
||||
if (pVnode->config.cacheLastSize != req.cacheLastSize) {
|
||||
pVnode->config.cacheLastSize = req.cacheLastSize;
|
||||
|
@ -1510,13 +1511,21 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
|
|||
|
||||
if (pVnode->config.walCfg.fsyncPeriod != req.walFsyncPeriod) {
|
||||
pVnode->config.walCfg.fsyncPeriod = req.walFsyncPeriod;
|
||||
|
||||
walChanged = true;
|
||||
}
|
||||
|
||||
if (pVnode->config.walCfg.level != req.walLevel) {
|
||||
pVnode->config.walCfg.level = req.walLevel;
|
||||
walChanged = true;
|
||||
}
|
||||
|
||||
if (pVnode->config.walCfg.retentionPeriod != req.walRetentionPeriod) {
|
||||
pVnode->config.walCfg.retentionPeriod = req.walRetentionPeriod;
|
||||
walChanged = true;
|
||||
}
|
||||
|
||||
if (pVnode->config.walCfg.retentionSize != req.walRetentionSize) {
|
||||
pVnode->config.walCfg.retentionSize = req.walRetentionSize;
|
||||
walChanged = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -179,17 +179,23 @@ _err:
|
|||
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
|
||||
if (pWal == NULL) return TSDB_CODE_APP_ERROR;
|
||||
|
||||
if (pWal->cfg.level == pCfg->level && pWal->cfg.fsyncPeriod == pCfg->fsyncPeriod) {
|
||||
wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->cfg.vgId, pWal->cfg.level,
|
||||
pWal->cfg.fsyncPeriod, pCfg->level, pCfg->fsyncPeriod);
|
||||
if (pWal->cfg.level == pCfg->level && pWal->cfg.fsyncPeriod == pCfg->fsyncPeriod &&
|
||||
pWal->cfg.retentionPeriod == pCfg->retentionPeriod && pWal->cfg.retentionSize == pCfg->retentionSize) {
|
||||
wDebug("vgId:%d, walLevel:%d fsync:%d walRetentionPeriod:%d walRetentionSize:%" PRId64 " not change",
|
||||
pWal->cfg.vgId, pWal->cfg.level, pWal->cfg.fsyncPeriod, pWal->cfg.retentionPeriod, pWal->cfg.retentionSize);
|
||||
return 0;
|
||||
}
|
||||
|
||||
wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->cfg.vgId, pWal->cfg.level,
|
||||
pWal->cfg.fsyncPeriod, pCfg->level, pCfg->fsyncPeriod);
|
||||
wInfo("vgId:%d, change old walLevel:%d fsync:%d walRetentionPeriod:%d walRetentionSize:%" PRId64
|
||||
", new walLevel:%d fsync:%d walRetentionPeriod:%d walRetentionSize:%" PRId64,
|
||||
pWal->cfg.vgId, pWal->cfg.level, pWal->cfg.fsyncPeriod, pWal->cfg.retentionPeriod, pWal->cfg.retentionSize,
|
||||
pCfg->level, pCfg->fsyncPeriod, pCfg->retentionPeriod, pCfg->retentionSize);
|
||||
|
||||
pWal->cfg.level = pCfg->level;
|
||||
pWal->cfg.fsyncPeriod = pCfg->fsyncPeriod;
|
||||
pWal->cfg.retentionPeriod = pCfg->retentionPeriod;
|
||||
pWal->cfg.retentionSize = pCfg->retentionSize;
|
||||
|
||||
pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
|
||||
if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
|
||||
|
||||
|
|
|
@ -327,7 +327,7 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
wDebug("vgId:%d, wal check remove file %" PRId64 "(file size %" PRId64 " close ts %" PRId64
|
||||
"), new tot size %" PRId64,
|
||||
pWal->cfg.vgId, iter->firstVer, iter->fileSize, iter->closeTs, newTotSize);
|
||||
if (((pWal->cfg.retentionSize == 0) || (pWal->cfg.retentionSize != -1 && newTotSize > pWal->cfg.retentionSize)) ||
|
||||
if ((pWal->cfg.retentionSize != -1 && pWal->cfg.retentionSize != 0 && newTotSize > pWal->cfg.retentionSize) ||
|
||||
((pWal->cfg.retentionPeriod == 0) || (pWal->cfg.retentionPeriod != -1 && iter->closeTs != -1 &&
|
||||
iter->closeTs + pWal->cfg.retentionPeriod < ts))) {
|
||||
// delete according to file size or close time
|
||||
|
|
Loading…
Reference in New Issue