fix(wal/level): enable alter replica from fake walLevel 1
This commit is contained in:
parent
bee01f9dba
commit
568799a6a3
|
@ -60,6 +60,7 @@ typedef struct {
|
||||||
EWalType level; // wal level
|
EWalType level; // wal level
|
||||||
int32_t encryptAlgorithm;
|
int32_t encryptAlgorithm;
|
||||||
char encryptKey[ENCRYPT_KEY_LEN + 1];
|
char encryptKey[ENCRYPT_KEY_LEN + 1];
|
||||||
|
int8_t clearFiles;
|
||||||
} SWalCfg;
|
} SWalCfg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -737,7 +737,9 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
.diskPrimary = pVnode->diskPrimary,
|
.diskPrimary = pVnode->diskPrimary,
|
||||||
};
|
};
|
||||||
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
|
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
|
||||||
vmCloseVnode(pMgmt, pVnode, false);
|
|
||||||
|
bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
|
||||||
|
vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal);
|
||||||
|
|
||||||
int32_t diskPrimary = wrapperCfg.diskPrimary;
|
int32_t diskPrimary = wrapperCfg.diskPrimary;
|
||||||
char path[TSDB_FILENAME_LEN] = {0};
|
char path[TSDB_FILENAME_LEN] = {0};
|
||||||
|
|
|
@ -52,6 +52,7 @@ extern const SVnodeCfg vnodeCfgDefault;
|
||||||
int32_t vnodeInit(int32_t nthreads);
|
int32_t vnodeInit(int32_t nthreads);
|
||||||
void vnodeCleanup();
|
void vnodeCleanup();
|
||||||
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs);
|
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs);
|
||||||
|
bool vnodeShouldRemoveWal(SVnode *pVnode);
|
||||||
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs);
|
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs);
|
||||||
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq,
|
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq,
|
||||||
int32_t diskPrimary, STfs *pTfs);
|
int32_t diskPrimary, STfs *pTfs);
|
||||||
|
|
|
@ -13,9 +13,9 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "tglobal.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
#include "tglobal.h"
|
|
||||||
|
|
||||||
const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
|
const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
|
||||||
.dbname = "",
|
.dbname = "",
|
||||||
|
@ -47,6 +47,7 @@ const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
|
||||||
.segSize = 0,
|
.segSize = 0,
|
||||||
.retentionSize = -1,
|
.retentionSize = -1,
|
||||||
.level = TAOS_WAL_WRITE,
|
.level = TAOS_WAL_WRITE,
|
||||||
|
.clearFiles = 0,
|
||||||
},
|
},
|
||||||
.hashBegin = 0,
|
.hashBegin = 0,
|
||||||
.hashEnd = 0,
|
.hashEnd = 0,
|
||||||
|
@ -142,6 +143,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
||||||
if (tjsonAddIntegerToObject(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "wal.clearFiles", pCfg->walCfg.clearFiles) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "wal.encryptAlgorithm", pCfg->walCfg.encryptAlgorithm) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "wal.encryptAlgorithm", pCfg->walCfg.encryptAlgorithm) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "tdbEncryptAlgorithm", pCfg->tdbEncryptAlgorithm) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "tdbEncryptAlgorithm", pCfg->tdbEncryptAlgorithm) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "sstTrigger", pCfg->sttTrigger) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "sstTrigger", pCfg->sttTrigger) < 0) return -1;
|
||||||
|
@ -253,8 +255,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
||||||
if (tsEncryptKey[0] == 0) {
|
if (tsEncryptKey[0] == 0) {
|
||||||
terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
|
terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
} else {
|
||||||
else{
|
|
||||||
strncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
|
strncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -273,6 +274,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
||||||
if (code < 0) return -1;
|
if (code < 0) return -1;
|
||||||
tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level, code);
|
tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level, code);
|
||||||
if (code < 0) return -1;
|
if (code < 0) return -1;
|
||||||
|
tjsonGetNumberValue(pJson, "wal.clearFiles", pCfg->walCfg.clearFiles, code);
|
||||||
|
if (code < 0) return -1;
|
||||||
tjsonGetNumberValue(pJson, "wal.encryptAlgorithm", pCfg->walCfg.encryptAlgorithm, code);
|
tjsonGetNumberValue(pJson, "wal.encryptAlgorithm", pCfg->walCfg.encryptAlgorithm, code);
|
||||||
if (code < 0) return -1;
|
if (code < 0) return -1;
|
||||||
#if defined(TD_ENTERPRISE)
|
#if defined(TD_ENTERPRISE)
|
||||||
|
@ -280,8 +283,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
||||||
if (tsEncryptKey[0] == 0) {
|
if (tsEncryptKey[0] == 0) {
|
||||||
terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
|
terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
} else {
|
||||||
else{
|
|
||||||
strncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
|
strncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -293,8 +295,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
||||||
if (tsEncryptKey[0] == 0) {
|
if (tsEncryptKey[0] == 0) {
|
||||||
terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
|
terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
} else {
|
||||||
else{
|
|
||||||
strncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
|
strncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,6 +81,8 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool vnodeShouldRemoveWal(SVnode *pVnode) { return pVnode->config.walCfg.clearFiles == 1; }
|
||||||
|
|
||||||
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs) {
|
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs) {
|
||||||
SVnodeInfo info = {0};
|
SVnodeInfo info = {0};
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
|
@ -129,6 +131,12 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t
|
||||||
}
|
}
|
||||||
pCfg->changeVersion = pReq->changeVersion;
|
pCfg->changeVersion = pReq->changeVersion;
|
||||||
|
|
||||||
|
if (info.config.walCfg.clearFiles) {
|
||||||
|
info.config.walCfg.clearFiles = 0;
|
||||||
|
|
||||||
|
vInfo("vgId:%d, reset wal clearFiles", pReq->vgId);
|
||||||
|
}
|
||||||
|
|
||||||
vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d", pReq->vgId,
|
vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d", pReq->vgId,
|
||||||
pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex, pCfg->changeVersion);
|
pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex, pCfg->changeVersion);
|
||||||
|
|
||||||
|
@ -491,8 +499,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
|
||||||
VNODE_METRIC_TAG_NAME_DNODE_ID, VNODE_METRIC_TAG_NAME_DNODE_EP,
|
VNODE_METRIC_TAG_NAME_DNODE_ID, VNODE_METRIC_TAG_NAME_DNODE_EP,
|
||||||
VNODE_METRIC_TAG_NAME_VGROUP_ID, VNODE_METRIC_TAG_NAME_USERNAME,
|
VNODE_METRIC_TAG_NAME_VGROUP_ID, VNODE_METRIC_TAG_NAME_USERNAME,
|
||||||
VNODE_METRIC_TAG_NAME_RESULT};
|
VNODE_METRIC_TAG_NAME_RESULT};
|
||||||
counter = taos_counter_new(VNODE_METRIC_SQL_COUNT, "counter for insert sql",
|
counter = taos_counter_new(VNODE_METRIC_SQL_COUNT, "counter for insert sql", label_count, sample_labels);
|
||||||
label_count, sample_labels);
|
|
||||||
vInfo("vgId:%d, new metric:%p", TD_VID(pVnode), counter);
|
vInfo("vgId:%d, new metric:%p", TD_VID(pVnode), counter);
|
||||||
if (taos_collector_registry_register_metric(counter) == 1) {
|
if (taos_collector_registry_register_metric(counter) == 1) {
|
||||||
taos_counter_destroy(counter);
|
taos_counter_destroy(counter);
|
||||||
|
|
|
@ -2021,6 +2021,9 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pRe
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->config.walCfg.level != req.walLevel) {
|
if (pVnode->config.walCfg.level != req.walLevel) {
|
||||||
|
if (pVnode->config.walCfg.level == 0) {
|
||||||
|
pVnode->config.walCfg.clearFiles = 1;
|
||||||
|
}
|
||||||
pVnode->config.walCfg.level = req.walLevel;
|
pVnode->config.walCfg.level = req.walLevel;
|
||||||
walChanged = true;
|
walChanged = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -6898,6 +6898,19 @@ static int32_t checkDbTbPrefixSuffixOptions(STranslateContext* pCxt, int32_t tbP
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions) {
|
static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions) {
|
||||||
|
if (pOptions->replica > 1) {
|
||||||
|
SDbCfgInfo dbCfg = {0};
|
||||||
|
int32_t code = getDBCfg(pCxt, pDbName, &dbCfg);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pOptions->walLevel == 0 || dbCfg.walLevel == 0) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
|
||||||
|
"Invalid option, wal_level 0 should be used with replica 1");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t daysPerFile = pOptions->daysPerFile;
|
int32_t daysPerFile = pOptions->daysPerFile;
|
||||||
int32_t s3KeepLocal = pOptions->s3KeepLocal;
|
int32_t s3KeepLocal = pOptions->s3KeepLocal;
|
||||||
int64_t daysToKeep0 = pOptions->keep[0];
|
int64_t daysToKeep0 = pOptions->keep[0];
|
||||||
|
@ -6926,11 +6939,6 @@ static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbNa
|
||||||
"Invalid option, with_arbitrator should be used with replica 2");
|
"Invalid option, with_arbitrator should be used with replica 2");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOptions->replica > 1 && pOptions->walLevel == 0) {
|
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
|
|
||||||
"Invalid option, wal_level 0 should be used with replica 1");
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue