feat: dispatch auto compact

This commit is contained in:
kailixu 2024-12-05 17:15:23 +08:00
parent a6e86619b0
commit 03e051ee63
5 changed files with 72 additions and 42 deletions

View File

@ -111,13 +111,15 @@ typedef struct SDatabaseOptions {
int8_t s3Compact; int8_t s3Compact;
int8_t withArbitrator; int8_t withArbitrator;
// for auto-compact // for auto-compact
int8_t compactTimeOffset; // hours int8_t compactTimeOffset; // hours
int32_t compactInterval; // minutes int32_t compactInterval; // minutes
int32_t compactStartTime; // minutes int32_t compactStartTime; // minutes
int32_t compactEndTime; // minutes int32_t compactEndTime; // minutes
SValueNode* pCompactTimeOffsetNode; SValueNode* pCompactTimeOffsetNode;
SValueNode* pCompactIntervalNode; SValueNode* pCompactIntervalNode;
SNodeList* pCompactTimeRangeList; SNodeList* pCompactTimeRangeList;
// for cache
SDbCfgInfo* pDbCfg;
} SDatabaseOptions; } SDatabaseOptions;
typedef struct SCreateDatabaseStmt { typedef struct SCreateDatabaseStmt {

View File

@ -932,7 +932,7 @@ static int32_t mndCompactDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pD
sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with %" PRIi64 " end with %" PRIi64, pDbName, tw->skey, sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with %" PRIi64 " end with %" PRIi64, pDbName, tw->skey,
tw->ekey); tw->ekey);
} }
auditRecord(pReq, pMnode->clusterId, "autoCompactDB", name.dbname, "", sql, sqlLen); auditRecord(NULL, pMnode->clusterId, "autoCompactDB", name.dbname, "", sql, sqlLen);
return 0; return 0;
} }
@ -992,7 +992,7 @@ static int32_t mndCompactDispatch(SRpcMsg *pReq) {
.skey = convertTimePrecision(curMs + compactStartTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision), .skey = convertTimePrecision(curMs + compactStartTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision),
.ekey = convertTimePrecision(curMs + compactEndTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision)}; .ekey = convertTimePrecision(curMs + compactEndTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision)};
if ((code = mndCompactDb(pMnode, pReq, pDb, tw, NULL)) == 0) { if ((code = mndCompactDb(pMnode, NULL, pDb, tw, NULL)) == 0) {
mInfo("db:%p,%s, succeed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64 mInfo("db:%p,%s, succeed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
"m, end:%" PRIi64 "m, offset:%" PRIi8 "h", "m, end:%" PRIi64 "m, offset:%" PRIi8 "h",
pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime, pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,

View File

@ -1154,26 +1154,28 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
code = 0; code = 0;
} }
if (pAlter->compactInterval != TSDB_DEFAULT_COMPACT_INTERVAL && pAlter->compactInterval != pDb->cfg.compactInterval) { if (pAlter->compactInterval >= TSDB_DEFAULT_COMPACT_INTERVAL && pAlter->compactInterval != pDb->cfg.compactInterval) {
pDb->cfg.compactInterval = pAlter->compactInterval; pDb->cfg.compactInterval = pAlter->compactInterval;
pDb->vgVersion++; pDb->vgVersion++;
code = 0; code = 0;
} }
if (pAlter->compactStartTime != TSDB_DEFAULT_COMPACT_START_TIME && if (pAlter->compactStartTime != pDb->cfg.compactStartTime &&
pAlter->compactStartTime != pDb->cfg.compactStartTime) { (pAlter->compactStartTime == TSDB_DEFAULT_COMPACT_START_TIME ||
pAlter->compactStartTime <= -pDb->cfg.daysPerFile)) {
pDb->cfg.compactStartTime = pAlter->compactStartTime; pDb->cfg.compactStartTime = pAlter->compactStartTime;
pDb->vgVersion++; pDb->vgVersion++;
code = 0; code = 0;
} }
if (pAlter->compactEndTime != TSDB_DEFAULT_COMPACT_END_TIME && pAlter->compactEndTime != pDb->cfg.compactEndTime) { if (pAlter->compactEndTime != pDb->cfg.compactEndTime &&
(pAlter->compactEndTime == TSDB_DEFAULT_COMPACT_END_TIME || pAlter->compactEndTime <= -pDb->cfg.daysPerFile)) {
pDb->cfg.compactEndTime = pAlter->compactEndTime; pDb->cfg.compactEndTime = pAlter->compactEndTime;
pDb->vgVersion++; pDb->vgVersion++;
code = 0; code = 0;
} }
if (pAlter->compactTimeOffset != TSDB_DEFAULT_COMPACT_TIME_OFFSET && if (pAlter->compactTimeOffset >= TSDB_MIN_COMPACT_TIME_OFFSET &&
pAlter->compactTimeOffset != pDb->cfg.compactTimeOffset) { pAlter->compactTimeOffset != pDb->cfg.compactTimeOffset) {
pDb->cfg.compactTimeOffset = pAlter->compactTimeOffset; pDb->cfg.compactTimeOffset = pAlter->compactTimeOffset;
pDb->vgVersion++; pDb->vgVersion++;

View File

@ -1327,14 +1327,20 @@ void nodesDestroyNode(SNode* pNode) {
} }
break; break;
} }
case QUERY_NODE_CREATE_DATABASE_STMT: case QUERY_NODE_CREATE_DATABASE_STMT: {
nodesDestroyNode((SNode*)((SCreateDatabaseStmt*)pNode)->pOptions); SDatabaseOptions* pOptions = ((SCreateDatabaseStmt*)pNode)->pOptions;
taosMemoryFreeClear(pOptions->pDbCfg);
nodesDestroyNode((SNode*)pOptions);
break; break;
}
case QUERY_NODE_DROP_DATABASE_STMT: // no pointer field case QUERY_NODE_DROP_DATABASE_STMT: // no pointer field
break; break;
case QUERY_NODE_ALTER_DATABASE_STMT: case QUERY_NODE_ALTER_DATABASE_STMT: {
nodesDestroyNode((SNode*)((SAlterDatabaseStmt*)pNode)->pOptions); SDatabaseOptions* pOptions = ((SAlterDatabaseStmt*)pNode)->pOptions;
taosMemoryFreeClear(pOptions->pDbCfg);
nodesDestroyNode((SNode*)pOptions);
break; break;
}
case QUERY_NODE_FLUSH_DATABASE_STMT: // no pointer field case QUERY_NODE_FLUSH_DATABASE_STMT: // no pointer field
case QUERY_NODE_TRIM_DATABASE_STMT: // no pointer field case QUERY_NODE_TRIM_DATABASE_STMT: // no pointer field
break; break;

View File

@ -8045,6 +8045,16 @@ static int32_t checkDbTbPrefixSuffixOptions(STranslateContext* pCxt, int32_t tbP
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE int32_t translateGetDbCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo** ppDbCfg) {
if (*ppDbCfg) {
return TSDB_CODE_SUCCESS;
}
if (!(*ppDbCfg = taosMemoryCalloc(1, sizeof(SDbCfgInfo)))) {
return terrno;
}
return getDBCfg(pCxt, pDbName, *ppDbCfg);
}
static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions) { static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions) {
int32_t daysPerFile = pOptions->daysPerFile; int32_t daysPerFile = pOptions->daysPerFile;
int32_t s3KeepLocal = pOptions->s3KeepLocal; int32_t s3KeepLocal = pOptions->s3KeepLocal;
@ -8052,13 +8062,9 @@ static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbNa
if (-1 == daysPerFile && -1 == daysToKeep0) { if (-1 == daysPerFile && -1 == daysToKeep0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (-1 == daysPerFile || -1 == daysToKeep0) { } else if (-1 == daysPerFile || -1 == daysToKeep0) {
SDbCfgInfo dbCfg = {0}; TAOS_CHECK_RETURN(translateGetDbCfg(pCxt, pDbName, &pOptions->pDbCfg));
int32_t code = getDBCfg(pCxt, pDbName, &dbCfg); daysPerFile = (-1 == daysPerFile ? pOptions->pDbCfg->daysPerFile : daysPerFile);
if (TSDB_CODE_SUCCESS != code) { daysToKeep0 = (-1 == daysToKeep0 ? pOptions->pDbCfg->daysToKeep0 : daysToKeep0);
return code;
}
daysPerFile = (-1 == daysPerFile ? dbCfg.daysPerFile : daysPerFile);
daysToKeep0 = (-1 == daysToKeep0 ? dbCfg.daysToKeep0 : daysToKeep0);
} }
if (daysPerFile > daysToKeep0 / 3) { if (daysPerFile > daysToKeep0 / 3) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION, return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
@ -8082,9 +8088,11 @@ static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbNa
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t checkDbCompactIntervalOption(STranslateContext* pCxt, SDatabaseOptions* pOptions) { static int32_t checkDbCompactIntervalOption(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions) {
int32_t code = 0; int32_t code = 0;
int64_t interval = 0; int64_t interval = 0;
int32_t keep2 = pOptions->keep[2];
if (NULL != pOptions->pCompactIntervalNode) { if (NULL != pOptions->pCompactIntervalNode) {
if (DEAL_RES_ERROR == translateValue(pCxt, pOptions->pCompactIntervalNode)) { if (DEAL_RES_ERROR == translateValue(pCxt, pOptions->pCompactIntervalNode)) {
return pCxt->errCode; return pCxt->errCode;
@ -8099,18 +8107,25 @@ static int32_t checkDbCompactIntervalOption(STranslateContext* pCxt, SDatabaseOp
} }
interval = getBigintFromValueNode(pOptions->pCompactIntervalNode); interval = getBigintFromValueNode(pOptions->pCompactIntervalNode);
if (interval != 0) { if (interval != 0) {
code = checkDbRangeOption(pCxt, "compact_interval", interval, TSDB_MIN_COMPACT_INTERVAL, if (keep2 == -1) { // alter db
pOptions->keep[2]); TAOS_CHECK_RETURN(translateGetDbCfg(pCxt, pDbName, &pOptions->pDbCfg));
keep2 = pOptions->pDbCfg->daysToKeep2;
}
code = checkDbRangeOption(pCxt, "compact_interval", interval, TSDB_MIN_COMPACT_INTERVAL, keep2);
} }
} else if (pOptions->compactInterval != 0) { } else if (pOptions->compactInterval > 0) {
interval = pOptions->compactInterval * 1440; // convert to minutes interval = pOptions->compactInterval * 1440; // convert to minutes
code = checkDbRangeOption(pCxt, "compact_interval", interval, TSDB_MIN_COMPACT_INTERVAL, pOptions->keep[2]); if (keep2 == -1) { // alter db
TAOS_CHECK_RETURN(translateGetDbCfg(pCxt, pDbName, &pOptions->pDbCfg));
keep2 = pOptions->pDbCfg->daysToKeep2;
}
code = checkDbRangeOption(pCxt, "compact_interval", interval, TSDB_MIN_COMPACT_INTERVAL, keep2);
} }
if (code == 0) pOptions->compactInterval = interval; if (code == 0) pOptions->compactInterval = interval;
return code; return code;
} }
static int32_t checkDbCompactTimeRangeOption(STranslateContext* pCxt, SDatabaseOptions* pOptions) { static int32_t checkDbCompactTimeRangeOption(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions) {
if (NULL == pOptions->pCompactTimeRangeList) { if (NULL == pOptions->pCompactTimeRangeList) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -8155,17 +8170,23 @@ static int32_t checkDbCompactTimeRangeOption(STranslateContext* pCxt, SDatabaseO
"Invalid option compact_time_range: %dm,%dm, start time should be less than end time", "Invalid option compact_time_range: %dm,%dm, start time should be less than end time",
pOptions->compactStartTime, pOptions->compactEndTime); pOptions->compactStartTime, pOptions->compactEndTime);
} }
if (pOptions->compactStartTime < -pOptions->keep[2] || pOptions->compactStartTime > -pOptions->daysPerFile) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION, int32_t keep2 = pOptions->keep[2];
"Invalid option compact_time_range: %dm, start_time should be in range: [%" PRIi64 int32_t days = pOptions->daysPerFile;
"m, %dm]", if (keep2 == -1 || days == -1) { // alter db
pOptions->compactStartTime, -pOptions->keep[2], -pOptions->daysPerFile); TAOS_CHECK_RETURN(translateGetDbCfg(pCxt, pDbName, &pOptions->pDbCfg));
keep2 = pOptions->pDbCfg->daysToKeep2;
days = pOptions->pDbCfg->daysPerFile;
} }
if (pOptions->compactEndTime < -pOptions->keep[2] || pOptions->compactEndTime > -pOptions->daysPerFile) { if (pOptions->compactStartTime < -keep2 || pOptions->compactStartTime > -days) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION, return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option compact_time_range: %dm, end time should be in range: [%" PRIi64 "Invalid option compact_time_range: %dm, start_time should be in range: [%dm, %dm]",
"m, %dm]", pOptions->compactStartTime, -keep2, -days);
pOptions->compactEndTime, -pOptions->keep[2], -pOptions->daysPerFile); }
if (pOptions->compactEndTime < -keep2 || pOptions->compactEndTime > -days) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option compact_time_range: %dm, end time should be in range: [%dm, %dm]",
pOptions->compactEndTime, -keep2, -days);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -8304,10 +8325,10 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
code = checkDbRangeOption(pCxt, "s3_compact", pOptions->s3Compact, TSDB_MIN_S3_COMPACT, TSDB_MAX_S3_COMPACT); code = checkDbRangeOption(pCxt, "s3_compact", pOptions->s3Compact, TSDB_MIN_S3_COMPACT, TSDB_MAX_S3_COMPACT);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkDbCompactIntervalOption(pCxt, pOptions); code = checkDbCompactIntervalOption(pCxt, pDbName, pOptions);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkDbCompactTimeRangeOption(pCxt, pOptions); code = checkDbCompactTimeRangeOption(pCxt, pDbName, pOptions);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkDbCompactTimeOffsetOption(pCxt, pOptions); code = checkDbCompactTimeOffsetOption(pCxt, pOptions);
@ -8557,9 +8578,8 @@ static int32_t buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStm
static int32_t translateAlterDatabase(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt) { static int32_t translateAlterDatabase(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt) {
if (pStmt->pOptions->walLevel == 0) { if (pStmt->pOptions->walLevel == 0) {
SDbCfgInfo dbCfg = {0}; TAOS_CHECK_RETURN(translateGetDbCfg(pCxt, pStmt->dbName, &pStmt->pOptions->pDbCfg));
int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg); if (pStmt->pOptions->pDbCfg->replications > 1) {
if (TSDB_CODE_SUCCESS == code && dbCfg.replications > 1) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION, return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option, wal_level 0 should be used with replica 1"); "Invalid option, wal_level 0 should be used with replica 1");
} }