Merge pull request #20571 from taosdata/feat/3.0_wxy
feat: alter database wal_retention_period/wal_retention_size
This commit is contained in:
commit
51565c9b29
|
@ -846,6 +846,8 @@ typedef struct {
|
|||
int8_t replications;
|
||||
int32_t sstTrigger;
|
||||
int32_t minRows;
|
||||
int32_t walRetentionPeriod;
|
||||
int32_t walRetentionSize;
|
||||
} SAlterDbReq;
|
||||
|
||||
int32_t tSerializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);
|
||||
|
|
|
@ -2219,6 +2219,8 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
|
|||
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->replications) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->sstTrigger) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
|
||||
|
||||
// 1st modification
|
||||
if (tEncodeI32(&encoder, pReq->minRows) < 0) return -1;
|
||||
|
@ -2250,6 +2252,13 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
|
|||
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->replications) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->sstTrigger) < 0) return -1;
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
if (tDecodeI32(&decoder, &pReq->walRetentionPeriod) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->walRetentionSize) < 0) return -1;
|
||||
} else {
|
||||
pReq->walRetentionPeriod = -1;
|
||||
pReq->walRetentionSize = -1;
|
||||
}
|
||||
|
||||
// 1st modification
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
|
|
|
@ -737,6 +737,20 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
|
|||
terrno = 0;
|
||||
}
|
||||
|
||||
if (pAlter->walRetentionPeriod > TSDB_DB_MIN_WAL_RETENTION_PERIOD &&
|
||||
pAlter->walRetentionPeriod != pDb->cfg.walRetentionPeriod) {
|
||||
pDb->cfg.walRetentionPeriod = pAlter->walRetentionPeriod;
|
||||
pDb->vgVersion++;
|
||||
terrno = 0;
|
||||
}
|
||||
|
||||
if (pAlter->walRetentionSize > TSDB_DB_MIN_WAL_RETENTION_SIZE &&
|
||||
pAlter->walRetentionSize != pDb->cfg.walRetentionSize) {
|
||||
pDb->cfg.walRetentionSize = pAlter->walRetentionSize;
|
||||
pDb->vgVersion++;
|
||||
terrno = 0;
|
||||
}
|
||||
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
|
|
@ -923,9 +923,14 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
taosMemoryFree(((SDescribeStmt*)pNode)->pMeta);
|
||||
break;
|
||||
case QUERY_NODE_RESET_QUERY_CACHE_STMT: // no pointer field
|
||||
case QUERY_NODE_COMPACT_DATABASE_STMT: // no pointer field
|
||||
case QUERY_NODE_CREATE_FUNCTION_STMT: // no pointer field
|
||||
case QUERY_NODE_DROP_FUNCTION_STMT: // no pointer field
|
||||
case QUERY_NODE_COMPACT_DATABASE_STMT: {
|
||||
SCompactDatabaseStmt* pStmt = (SCompactDatabaseStmt*)pNode;
|
||||
nodesDestroyNode(pStmt->pStart);
|
||||
nodesDestroyNode(pStmt->pEnd);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_CREATE_FUNCTION_STMT: // no pointer field
|
||||
case QUERY_NODE_DROP_FUNCTION_STMT: // no pointer field
|
||||
break;
|
||||
case QUERY_NODE_CREATE_STREAM_STMT: {
|
||||
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pNode;
|
||||
|
|
|
@ -237,6 +237,18 @@ alter_db_option(A) ::= REPLICA NK_INTEGER(B).
|
|||
alter_db_option(A) ::= WAL_LEVEL NK_INTEGER(B). { A.type = DB_OPTION_WAL; A.val = B; }
|
||||
alter_db_option(A) ::= STT_TRIGGER NK_INTEGER(B). { A.type = DB_OPTION_STT_TRIGGER; A.val = B; }
|
||||
alter_db_option(A) ::= MINROWS NK_INTEGER(B). { A.type = DB_OPTION_MINROWS; A.val = B; }
|
||||
alter_db_option(A) ::= WAL_RETENTION_PERIOD NK_INTEGER(B). { A.type = DB_OPTION_WAL_RETENTION_PERIOD; A.val = B; }
|
||||
alter_db_option(A) ::= WAL_RETENTION_PERIOD NK_MINUS(B) NK_INTEGER(C). {
|
||||
SToken t = B;
|
||||
t.n = (C.z + C.n) - B.z;
|
||||
A.type = DB_OPTION_WAL_RETENTION_PERIOD; A.val = t;
|
||||
}
|
||||
alter_db_option(A) ::= WAL_RETENTION_SIZE NK_INTEGER(B). { A.type = DB_OPTION_WAL_RETENTION_SIZE; A.val = B; }
|
||||
alter_db_option(A) ::= WAL_RETENTION_SIZE NK_MINUS(B) NK_INTEGER(C). {
|
||||
SToken t = B;
|
||||
t.n = (C.z + C.n) - B.z;
|
||||
A.type = DB_OPTION_WAL_RETENTION_SIZE; A.val = t;
|
||||
}
|
||||
|
||||
%type integer_list { SNodeList* }
|
||||
%destructor integer_list { nodesDestroyList($$); }
|
||||
|
|
|
@ -925,8 +925,8 @@ SNode* createAlterDatabaseOptions(SAstCreateContext* pCxt) {
|
|||
pOptions->numOfVgroups = -1;
|
||||
pOptions->singleStable = -1;
|
||||
pOptions->schemaless = -1;
|
||||
pOptions->walRetentionPeriod = -1;
|
||||
pOptions->walRetentionSize = -1;
|
||||
pOptions->walRetentionPeriod = -2; // -1 is a valid value
|
||||
pOptions->walRetentionSize = -2; // -1 is a valid value
|
||||
pOptions->walRollPeriod = -1;
|
||||
pOptions->walSegmentSize = -1;
|
||||
pOptions->sstTrigger = -1;
|
||||
|
@ -935,7 +935,8 @@ SNode* createAlterDatabaseOptions(SAstCreateContext* pCxt) {
|
|||
return (SNode*)pOptions;
|
||||
}
|
||||
|
||||
SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOptionType type, void* pVal) {
|
||||
static SNode* setDatabaseOptionImpl(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOptionType type, void* pVal,
|
||||
bool alter) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
SDatabaseOptions* pDbOptions = (SDatabaseOptions*)pOptions;
|
||||
switch (type) {
|
||||
|
@ -986,7 +987,9 @@ SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOpti
|
|||
break;
|
||||
case DB_OPTION_REPLICA:
|
||||
pDbOptions->replica = taosStr2Int8(((SToken*)pVal)->z, NULL, 10);
|
||||
updateWalOptionsDefault(pDbOptions);
|
||||
if (!alter) {
|
||||
updateWalOptionsDefault(pDbOptions);
|
||||
}
|
||||
break;
|
||||
case DB_OPTION_STRICT:
|
||||
COPY_STRING_FORM_STR_TOKEN(pDbOptions->strictStr, (SToken*)pVal);
|
||||
|
@ -1033,16 +1036,20 @@ SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOpti
|
|||
return pOptions;
|
||||
}
|
||||
|
||||
SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOptionType type, void* pVal) {
|
||||
return setDatabaseOptionImpl(pCxt, pOptions, type, pVal, false);
|
||||
}
|
||||
|
||||
SNode* setAlterDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, SAlterOption* pAlterOption) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
switch (pAlterOption->type) {
|
||||
case DB_OPTION_KEEP:
|
||||
case DB_OPTION_RETENTIONS:
|
||||
return setDatabaseOption(pCxt, pOptions, pAlterOption->type, pAlterOption->pList);
|
||||
return setDatabaseOptionImpl(pCxt, pOptions, pAlterOption->type, pAlterOption->pList, true);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return setDatabaseOption(pCxt, pOptions, pAlterOption->type, &pAlterOption->val);
|
||||
return setDatabaseOptionImpl(pCxt, pOptions, pAlterOption->type, &pAlterOption->val, true);
|
||||
}
|
||||
|
||||
SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pDbName, SNode* pOptions) {
|
||||
|
|
|
@ -4254,6 +4254,8 @@ static void buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt,
|
|||
pReq->replications = pStmt->pOptions->replica;
|
||||
pReq->sstTrigger = pStmt->pOptions->sstTrigger;
|
||||
pReq->minRows = pStmt->pOptions->minRowsPerBlock;
|
||||
pReq->walRetentionPeriod = pStmt->pOptions->walRetentionPeriod;
|
||||
pReq->walRetentionSize = pStmt->pOptions->walRetentionSize;
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -109,7 +109,8 @@ TEST_F(ParserInitialATest, alterDnode) {
|
|||
* | WAL_LEVEL int_value -- enum 1, 2, default 1
|
||||
* | STT_TRIGGER int_value -- rang [1, 16], default 8
|
||||
* | MINROWS int_value -- rang [10, 1000], default 100
|
||||
* }
|
||||
* | WAL_RETENTION_PERIOD int_value -- rang [-1, INT32_MAX], default 0
|
||||
* | WAL_RETENTION_SIZE int_value -- rang [-1, INT32_MAX], default 0
|
||||
*/
|
||||
TEST_F(ParserInitialATest, alterDatabase) {
|
||||
useDb("root", "test");
|
||||
|
@ -135,6 +136,8 @@ TEST_F(ParserInitialATest, alterDatabase) {
|
|||
expect.replications = -1;
|
||||
expect.sstTrigger = -1;
|
||||
expect.minRows = -1;
|
||||
expect.walRetentionPeriod = -2;
|
||||
expect.walRetentionSize = -2;
|
||||
};
|
||||
auto setAlterDbBuffer = [&](int32_t buffer) { expect.buffer = buffer; };
|
||||
auto setAlterDbPageSize = [&](int32_t pageSize) { expect.pageSize = pageSize; };
|
||||
|
@ -153,6 +156,10 @@ TEST_F(ParserInitialATest, alterDatabase) {
|
|||
auto setAlterDbReplica = [&](int8_t replications) { expect.replications = replications; };
|
||||
auto setAlterDbSttTrigger = [&](int8_t sstTrigger) { expect.sstTrigger = sstTrigger; };
|
||||
auto setAlterDbMinRows = [&](int32_t minRows) { expect.minRows = minRows; };
|
||||
auto setAlterDbWalRetentionPeriod = [&](int32_t walRetentionPeriod) {
|
||||
expect.walRetentionPeriod = walRetentionPeriod;
|
||||
};
|
||||
auto setAlterDbWalRetentionSize = [&](int32_t walRetentionSize) { expect.walRetentionSize = walRetentionSize; };
|
||||
|
||||
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
|
||||
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_ALTER_DATABASE_STMT);
|
||||
|
@ -174,6 +181,8 @@ TEST_F(ParserInitialATest, alterDatabase) {
|
|||
ASSERT_EQ(req.replications, expect.replications);
|
||||
ASSERT_EQ(req.sstTrigger, expect.sstTrigger);
|
||||
ASSERT_EQ(req.minRows, expect.minRows);
|
||||
ASSERT_EQ(req.walRetentionPeriod, expect.walRetentionPeriod);
|
||||
ASSERT_EQ(req.walRetentionSize, expect.walRetentionSize);
|
||||
});
|
||||
|
||||
const int32_t MINUTE_PER_DAY = MILLISECOND_PER_DAY / MILLISECOND_PER_MINUTE;
|
||||
|
@ -189,8 +198,10 @@ TEST_F(ParserInitialATest, alterDatabase) {
|
|||
setAlterDbBuffer(16);
|
||||
setAlterDbPages(128);
|
||||
setAlterDbReplica(3);
|
||||
setAlterDbWalRetentionPeriod(10);
|
||||
setAlterDbWalRetentionSize(20);
|
||||
run("ALTER DATABASE test BUFFER 16 CACHEMODEL 'last_row' CACHESIZE 32 WAL_FSYNC_PERIOD 200 KEEP 10 PAGES 128 "
|
||||
"REPLICA 3 WAL_LEVEL 1 STT_TRIGGER 16");
|
||||
"REPLICA 3 WAL_LEVEL 1 STT_TRIGGER 16 WAL_RETENTION_PERIOD 10 WAL_RETENTION_SIZE 20");
|
||||
clearAlterDbReq();
|
||||
|
||||
initAlterDb("test");
|
||||
|
@ -290,6 +301,20 @@ TEST_F(ParserInitialATest, alterDatabase) {
|
|||
setAlterDbMinRows(1000);
|
||||
run("ALTER DATABASE test MINROWS 1000");
|
||||
clearAlterDbReq();
|
||||
|
||||
initAlterDb("test");
|
||||
setAlterDbWalRetentionPeriod(-1);
|
||||
run("ALTER DATABASE test WAL_RETENTION_PERIOD -1");
|
||||
setAlterDbWalRetentionPeriod(50);
|
||||
run("ALTER DATABASE test WAL_RETENTION_PERIOD 50");
|
||||
clearAlterDbReq();
|
||||
|
||||
initAlterDb("test");
|
||||
setAlterDbWalRetentionSize(-1);
|
||||
run("ALTER DATABASE test WAL_RETENTION_SIZE -1");
|
||||
setAlterDbWalRetentionSize(50);
|
||||
run("ALTER DATABASE test WAL_RETENTION_SIZE 50");
|
||||
clearAlterDbReq();
|
||||
}
|
||||
|
||||
TEST_F(ParserInitialATest, alterDatabaseSemanticCheck) {
|
||||
|
@ -612,7 +637,9 @@ TEST_F(ParserInitialATest, alterTable) {
|
|||
}
|
||||
ASSERT_EQ(req.isNull, expect.isNull);
|
||||
ASSERT_EQ(req.nTagVal, expect.nTagVal);
|
||||
ASSERT_EQ(memcmp(req.pTagVal, expect.pTagVal, expect.nTagVal), 0);
|
||||
if (nullptr != req.pTagVal) {
|
||||
ASSERT_EQ(memcmp(req.pTagVal, expect.pTagVal, expect.nTagVal), 0);
|
||||
}
|
||||
ASSERT_EQ(req.updateTTL, expect.updateTTL);
|
||||
ASSERT_EQ(req.newTTL, expect.newTTL);
|
||||
if (nullptr != expect.newComment) {
|
||||
|
|
Loading…
Reference in New Issue