feat: dispatch db auto compact
This commit is contained in:
parent
4707149369
commit
f4fc423f34
|
@ -423,10 +423,10 @@ typedef struct {
|
||||||
int8_t s3Compact;
|
int8_t s3Compact;
|
||||||
int8_t withArbitrator;
|
int8_t withArbitrator;
|
||||||
int8_t encryptAlgorithm;
|
int8_t encryptAlgorithm;
|
||||||
int8_t compactTimeOffset;
|
int8_t compactTimeOffset; // hour
|
||||||
int32_t compactInterval;
|
int32_t compactInterval; // minute
|
||||||
int32_t compactStartTime;
|
int32_t compactStartTime; // minute
|
||||||
int32_t compactEndTime;
|
int32_t compactEndTime; // minute
|
||||||
} SDbCfg;
|
} SDbCfg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -891,28 +891,103 @@ static void mndCompactPullup(SMnode *pMnode) {
|
||||||
taosArrayDestroy(pArray);
|
taosArrayDestroy(pArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndCompactDispatchReq(SMnode *pMnode, SDnodeInfo *pDnodeInfo, int32_t contLen, void *pCont) {
|
||||||
|
// send grant status to dnode
|
||||||
|
SRpcMsg rpcMsg = {
|
||||||
|
.pCont = pCont, .contLen = contLen, .msgType = TDMT_MND_GRANT, .info.ahandle = (void *)0x818, .info.noResp = 1};
|
||||||
|
|
||||||
|
SEpSet epSet = {.numOfEps = 1};
|
||||||
|
tstrncpy(epSet.eps[0].fqdn, pDnodeInfo->ep.fqdn, TSDB_FQDN_LEN);
|
||||||
|
epSet.eps[0].port = pDnodeInfo->ep.port;
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
if ((code = tmsgSendReq(&epSet, &rpcMsg)) != 0) {
|
||||||
|
uWarn("failed to send grant status msg since %s", tstrerror(code));
|
||||||
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndCompactDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow *tw) {
|
||||||
|
if (!tsEnableAudit || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
SName name = {0};
|
||||||
|
int32_t sqlLen = 0;
|
||||||
|
char sql[256] = {0};
|
||||||
|
char skeyStr[40] = {0};
|
||||||
|
char ekeyStr[40] = {0};
|
||||||
|
char *pDbName = pDb->name;
|
||||||
|
|
||||||
|
if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) == 0) {
|
||||||
|
pDbName = name.dbname;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosFormatUtcTime(skeyStr, sizeof(skeyStr), tw->skey, pDb->cfg.precision) == 0 &&
|
||||||
|
taosFormatUtcTime(ekeyStr, sizeof(ekeyStr), tw->ekey, pDb->cfg.precision) == 0) {
|
||||||
|
sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with '%s' end with '%s'", pDbName, skeyStr, ekeyStr);
|
||||||
|
} else {
|
||||||
|
sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with %" PRIi64 " end with %" PRIi64, pDbName, tw->skey,
|
||||||
|
tw->ekey);
|
||||||
|
}
|
||||||
|
auditRecord(pReq, pMnode->clusterId, "autoCompactDB", name.dbname, "", sql, sqlLen);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
extern int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds);
|
||||||
static int32_t mndCompactDispatch(SRpcMsg *pReq) {
|
static int32_t mndCompactDispatch(SRpcMsg *pReq) {
|
||||||
int32_t code = 0, lino = 0;
|
int32_t code = 0;
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
|
int64_t nowSec = now / 60000;
|
||||||
|
|
||||||
void *pDbIter = NULL;
|
void *pIter = NULL;
|
||||||
SDbObj *pDb = NULL;
|
SDbObj *pDb = NULL;
|
||||||
while ((pDbIter = sdbFetch(pSdb, SDB_DB, pDbIter, (void **)&pDb))) {
|
while ((pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb))) {
|
||||||
if ((pDb->cfg.compactInterval == 0) || (pDb->cfg.compactStartTime >= pDb->cfg.compactEndTime)) {
|
if ((pDb->cfg.compactInterval == 0) || (pDb->cfg.compactStartTime >= pDb->cfg.compactEndTime)) {
|
||||||
sdbRelease(pSdb, pDb);
|
sdbRelease(pSdb, pDb);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// mndCompactSendProgressReq(pMnode, pDb);
|
if (pDb->compactStartTime == nowSec) {
|
||||||
|
assert(0);
|
||||||
|
sdbRelease(pSdb, pDb);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (((nowSec + pDb->cfg.compactTimeOffset) % pDb->cfg.compactInterval) != 0) {
|
||||||
|
sdbRelease(pSdb, pDb);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
STimeWindow tw = {.skey = convertTimePrecision(now - (int64_t)pDb->cfg.compactStartTime * 60000,
|
||||||
|
TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision),
|
||||||
|
.ekey = convertTimePrecision(now - (int64_t)pDb->cfg.compactEndTime * 60000,
|
||||||
|
TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision)};
|
||||||
|
|
||||||
|
if ((code = mndCompactDb(pMnode, pReq, pDb, tw, NULL)) == 0) {
|
||||||
|
pDb->compactStartTime = nowSec;
|
||||||
|
mInfo("db:%s, dispatch auto compact with range:[%" PRIi64 ",%" PRIi64
|
||||||
|
"], interval:%d, start:%d, end:%d, offset:%" PRIi8,
|
||||||
|
pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, pDb->cfg.compactStartTime, pDb->cfg.compactEndTime,
|
||||||
|
pDb->cfg.compactTimeOffset);
|
||||||
|
} else {
|
||||||
|
mWarn("db:%s, failed to dispatch auto compact with range:[%" PRIi64 ",%" PRIi64
|
||||||
|
"], interval:%d, start:%d, end:%d, offset:%" PRIi8 ", since %s",
|
||||||
|
pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, pDb->cfg.compactStartTime, pDb->cfg.compactEndTime,
|
||||||
|
pDb->cfg.compactTimeOffset, tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_UNUSED(mndCompactDispatchAudit(pMnode, pReq, pDb, &tw));
|
||||||
|
|
||||||
sdbRelease(pSdb, pDb);
|
sdbRelease(pSdb, pDb);
|
||||||
}
|
}
|
||||||
_exit:
|
|
||||||
if (code) {
|
return 0;
|
||||||
mWarn("failed to dispatch auto compact at line %d since %s", lino, tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessCompactTimer(SRpcMsg *pReq) {
|
static int32_t mndProcessCompactTimer(SRpcMsg *pReq) {
|
||||||
|
|
Loading…
Reference in New Issue