diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 3a47b87b8b..2346e060f6 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -423,10 +423,10 @@ typedef struct { int8_t s3Compact; int8_t withArbitrator; int8_t encryptAlgorithm; - int8_t compactTimeOffset; - int32_t compactInterval; - int32_t compactStartTime; - int32_t compactEndTime; + int8_t compactTimeOffset; // hour + int32_t compactInterval; // minute + int32_t compactStartTime; // minute + int32_t compactEndTime; // minute } SDbCfg; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndCompact.c b/source/dnode/mnode/impl/src/mndCompact.c index 0647134547..47ed48bb05 100644 --- a/source/dnode/mnode/impl/src/mndCompact.c +++ b/source/dnode/mnode/impl/src/mndCompact.c @@ -891,28 +891,103 @@ static void mndCompactPullup(SMnode *pMnode) { taosArrayDestroy(pArray); } +static int32_t mndCompactDispatchReq(SMnode *pMnode, SDnodeInfo *pDnodeInfo, int32_t contLen, void *pCont) { + // send grant status to dnode + SRpcMsg rpcMsg = { + .pCont = pCont, .contLen = contLen, .msgType = TDMT_MND_GRANT, .info.ahandle = (void *)0x818, .info.noResp = 1}; + + SEpSet epSet = {.numOfEps = 1}; + tstrncpy(epSet.eps[0].fqdn, pDnodeInfo->ep.fqdn, TSDB_FQDN_LEN); + epSet.eps[0].port = pDnodeInfo->ep.port; + + int32_t code = 0; + if ((code = tmsgSendReq(&epSet, &rpcMsg)) != 0) { + uWarn("failed to send grant status msg since %s", tstrerror(code)); + TAOS_RETURN(code); + } + + TAOS_RETURN(TSDB_CODE_SUCCESS); +} + +static int32_t mndCompactDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow *tw) { + if (!tsEnableAudit || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) { + return 0; + } + + SName name = {0}; + int32_t sqlLen = 0; + char sql[256] = {0}; + char skeyStr[40] = {0}; + char ekeyStr[40] = {0}; + char *pDbName = pDb->name; + + if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) == 0) { + pDbName = name.dbname; + } + + if (taosFormatUtcTime(skeyStr, sizeof(skeyStr), tw->skey, pDb->cfg.precision) == 0 && + taosFormatUtcTime(ekeyStr, sizeof(ekeyStr), tw->ekey, pDb->cfg.precision) == 0) { + sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with '%s' end with '%s'", pDbName, skeyStr, ekeyStr); + } else { + sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with %" PRIi64 " end with %" PRIi64, pDbName, tw->skey, + tw->ekey); + } + auditRecord(pReq, pMnode->clusterId, "autoCompactDB", name.dbname, "", sql, sqlLen); + + return 0; +} + +extern int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds); static int32_t mndCompactDispatch(SRpcMsg *pReq) { - int32_t code = 0, lino = 0; + int32_t code = 0; SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; int64_t now = taosGetTimestampMs(); + int64_t nowSec = now / 60000; - void *pDbIter = NULL; + void *pIter = NULL; SDbObj *pDb = NULL; - while ((pDbIter = sdbFetch(pSdb, SDB_DB, pDbIter, (void **)&pDb))) { + while ((pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb))) { if ((pDb->cfg.compactInterval == 0) || (pDb->cfg.compactStartTime >= pDb->cfg.compactEndTime)) { sdbRelease(pSdb, pDb); continue; } - // mndCompactSendProgressReq(pMnode, pDb); + if (pDb->compactStartTime == nowSec) { + assert(0); + sdbRelease(pSdb, pDb); + continue; + } + + if (((nowSec + pDb->cfg.compactTimeOffset) % pDb->cfg.compactInterval) != 0) { + sdbRelease(pSdb, pDb); + continue; + } + + STimeWindow tw = {.skey = convertTimePrecision(now - (int64_t)pDb->cfg.compactStartTime * 60000, + TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision), + .ekey = convertTimePrecision(now - (int64_t)pDb->cfg.compactEndTime * 60000, + TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision)}; + + if ((code = mndCompactDb(pMnode, pReq, pDb, tw, NULL)) == 0) { + pDb->compactStartTime = nowSec; + mInfo("db:%s, dispatch auto compact with range:[%" PRIi64 ",%" PRIi64 + "], interval:%d, start:%d, end:%d, offset:%" PRIi8, + pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, pDb->cfg.compactStartTime, pDb->cfg.compactEndTime, + pDb->cfg.compactTimeOffset); + } else { + mWarn("db:%s, failed to dispatch auto compact with range:[%" PRIi64 ",%" PRIi64 + "], interval:%d, start:%d, end:%d, offset:%" PRIi8 ", since %s", + pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, pDb->cfg.compactStartTime, pDb->cfg.compactEndTime, + pDb->cfg.compactTimeOffset, tstrerror(code)); + } + + TAOS_UNUSED(mndCompactDispatchAudit(pMnode, pReq, pDb, &tw)); + sdbRelease(pSdb, pDb); } -_exit: - if (code) { - mWarn("failed to dispatch auto compact at line %d since %s", lino, tstrerror(code)); - } - return code; + + return 0; } static int32_t mndProcessCompactTimer(SRpcMsg *pReq) {