diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 763c4ae6e7..3de291cb91 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -130,6 +130,7 @@ extern bool tsKeepColumnName; extern bool tsEnableQueryHb; extern bool tsEnableScience; extern bool tsTtlChangeOnWrite; +extern int32_t tsTtlFlushThreshold; extern int32_t tsRedirectPeriod; extern int32_t tsRedirectFactor; extern int32_t tsRedirectMaxPeriod; @@ -186,7 +187,9 @@ extern int32_t tsTransPullupInterval; extern int32_t tsMqRebalanceInterval; extern int32_t tsStreamCheckpointTickInterval; extern int32_t tsTtlUnit; -extern int32_t tsTtlPushInterval; +extern int32_t tsTtlPushIntervalSec; +extern int32_t tsTtlBatchDropNum; +extern int32_t tsTrimVDbIntervalSec; extern int32_t tsGrantHBInterval; extern int32_t tsUptimeInterval; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c6bb599a7d..21cad322bb 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1161,6 +1161,9 @@ int32_t tDeserializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq); typedef struct { int32_t timestampSec; + int32_t ttlDropMaxCount; + int32_t nUids; + SArray* pTbUids; } SVDropTtlTableReq; int32_t tSerializeSVDropTtlTableReq(void* buf, int32_t bufLen, SVDropTtlTableReq* pReq); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 483122b070..323b2de6ba 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -65,7 +65,7 @@ enum { #define TD_NEW_MSG_SEG(TYPE) TYPE = ((TYPE##_SEG_CODE) << 8), #define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) TYPE, TYPE##_RSP, -enum { +enum { // WARN: new msg should be appended to segment tail #endif TD_NEW_MSG_SEG(TDMT_DND_MSG) TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL) @@ -89,15 +89,15 @@ enum { TD_NEW_MSG_SEG(TDMT_MND_MSG) TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ACCT, "create-acct", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ACCT, "create-acct", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_ACCT, "alter-acct", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_ACCT, "drop-acct", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_CREATE_USER, "create-user", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_USER, "create-user", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_USER, "alter-user", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_DROP_USER, "drop-user", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_AUTH, "get-user-auth", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_DROP_USER, "drop-user", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_AUTH, "get-user-auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_DNODE, "create-dnode", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_DNODE, "config-dnode", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_DNODE, "config-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_DNODE, "drop-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_MNODE, "create-mnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_MNODE, "alter-mnode", NULL, NULL) @@ -182,6 +182,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) @@ -296,7 +297,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL) - + TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 6879ea05da..86a165f14a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -124,7 +124,6 @@ int32_t tsQueryRspPolicy = 0; int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT bool tsEnableQueryHb = true; bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true -bool tsTtlChangeOnWrite = false; // ttl delete time changes on last write if true int32_t tsQuerySmaOptimize = 0; int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data. bool tsQueryPlannerTrace = false; @@ -226,12 +225,20 @@ bool tsStartUdfd = true; // wal int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L); +// ttl +bool tsTtlChangeOnWrite = false; // if true, ttl delete time changes on last write +int32_t tsTtlFlushThreshold = 100; /* maximum number of dirty items in memory. + * if -1, flush will not be triggered by write-ops + */ +int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch + // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; int32_t tsStreamCheckpointTickInterval = 1; int32_t tsTtlUnit = 86400; -int32_t tsTtlPushInterval = 3600; +int32_t tsTtlPushIntervalSec = 10; +int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups int32_t tsGrantHBInterval = 60; int32_t tsUptimeInterval = 300; // seconds char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits @@ -604,8 +611,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushIntervalSec, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "ttlBatchDropNum", tsTtlBatchDropNum, 0, INT32_MAX, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "ttlChangeOnWrite", tsTtlChangeOnWrite, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "ttlFlushThreshold", tsTtlFlushThreshold, -1, 1000000, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "trimVDbIntervalSec", tsTrimVDbIntervalSec, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER) != 0) return -1; @@ -994,6 +1004,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval; tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval; + tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32; tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32; tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; @@ -1003,7 +1014,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32; tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32; - tsTtlPushInterval = cfgGetItem(pCfg, "ttlPushInterval")->i32; + tsTtlPushIntervalSec = cfgGetItem(pCfg, "ttlPushInterval")->i32; + tsTtlBatchDropNum = cfgGetItem(pCfg, "ttlBatchDropNum")->i32; + tsTrimVDbIntervalSec = cfgGetItem(pCfg, "trimVDbIntervalSec")->i32; tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32; tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32; @@ -1405,13 +1418,19 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) { } else if (strcasecmp("ttlUnit", name) == 0) { tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32; } else if (strcasecmp("ttlPushInterval", name) == 0) { - tsTtlPushInterval = cfgGetItem(pCfg, "ttlPushInterval")->i32; + tsTtlPushIntervalSec = cfgGetItem(pCfg, "ttlPushInterval")->i32; + } else if (strcasecmp("ttlBatchDropNum", name) == 0) { + tsTtlBatchDropNum = cfgGetItem(pCfg, "ttlBatchDropNum")->i32; + } else if (strcasecmp("trimVDbIntervalSec", name) == 0) { + tsTrimVDbIntervalSec = cfgGetItem(pCfg, "trimVDbIntervalSec")->i32; } else if (strcasecmp("tmrDebugFlag", name) == 0) { tmrDebugFlag = cfgGetItem(pCfg, "tmrDebugFlag")->i32; } else if (strcasecmp("tsdbDebugFlag", name) == 0) { tsdbDebugFlag = cfgGetItem(pCfg, "tsdbDebugFlag")->i32; } else if (strcasecmp("tqDebugFlag", name) == 0) { tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32; + } else if (strcasecmp("ttlFlushThreshold", name) == 0) { + tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32; } break; } @@ -1613,6 +1632,20 @@ void taosCfgDynamicOptions(const char *option, const char *value) { return; } + if (strcasecmp(option, "ttlPushInterval") == 0) { + int32_t newTtlPushInterval = atoi(value); + uInfo("ttlPushInterval set from %d to %d", tsTtlPushIntervalSec, newTtlPushInterval); + tsTtlPushIntervalSec = newTtlPushInterval; + return; + } + + if (strcasecmp(option, "ttlBatchDropNum") == 0) { + int32_t newTtlBatchDropNum = atoi(value); + uInfo("ttlBatchDropNum set from %d to %d", tsTtlBatchDropNum, newTtlBatchDropNum); + tsTtlBatchDropNum = newTtlBatchDropNum; + return; + } + const char *options[] = { "dDebugFlag", "vDebugFlag", "mDebugFlag", "wDebugFlag", "sDebugFlag", "tsdbDebugFlag", "tqDebugFlag", "fsDebugFlag", "udfDebugFlag", "smaDebugFlag", "idxDebugFlag", "tdbDebugFlag", "tmrDebugFlag", "uDebugFlag", diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index de76881467..27b6499497 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3179,6 +3179,12 @@ int32_t tSerializeSVDropTtlTableReq(void *buf, int32_t bufLen, SVDropTtlTableReq if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI32(&encoder, pReq->timestampSec) < 0) return -1; + if (tEncodeI32(&encoder, pReq->ttlDropMaxCount) < 0) return -1; + if (tEncodeI32(&encoder, pReq->nUids) < 0) return -1; + for (int32_t i = 0; i < pReq->nUids; ++i) { + tb_uid_t *pTbUid = taosArrayGet(pReq->pTbUids, i); + if (tEncodeI64(&encoder, *pTbUid) < 0) return -1; + } tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -3192,6 +3198,30 @@ int32_t tDeserializeSVDropTtlTableReq(void *buf, int32_t bufLen, SVDropTtlTableR if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI32(&decoder, &pReq->timestampSec) < 0) return -1; + pReq->ttlDropMaxCount = INT32_MAX; + pReq->nUids = 0; + pReq->pTbUids = NULL; + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI32(&decoder, &pReq->ttlDropMaxCount) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->nUids) < 0) return -1; + + if (pReq->nUids > 0) { + pReq->pTbUids = taosArrayInit(pReq->nUids, sizeof(tb_uid_t)); + if (pReq->pTbUids == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } + + tb_uid_t tbUid = 0; + for (int32_t i = 0; i < pReq->nUids; ++i) { + if (tDecodeI64(&decoder, &tbUid) < 0) return -1; + if (taosArrayPush(pReq->pTbUids, &tbUid) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } + } tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 302d4dafd1..ae1b46a21d 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -33,10 +33,10 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { return -1; } - SMnodeOpt option = {.deploy = true, .numOfReplicas = createReq.replica, - .numOfTotalReplicas = createReq.replica + createReq.learnerReplica, + SMnodeOpt option = {.deploy = true, .numOfReplicas = createReq.replica, + .numOfTotalReplicas = createReq.replica + createReq.learnerReplica, .selfIndex = -1, .lastIndex = createReq.lastIndex}; - + memcpy(option.replicas, createReq.replicas, sizeof(createReq.replicas)); for (int32_t i = 0; i < createReq.replica; ++i) { if (createReq.replicas[i].id == pInput->pData->dnodeId) { @@ -191,6 +191,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 39285ced5d..917c6a00bc 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -1123,6 +1123,36 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { strcpy(dcfgReq.config, "keeptimeoffset"); snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); + } else if (strncasecmp(cfgReq.config, "ttlpushinterval", 14) == 0) { + int32_t optLen = strlen("ttlpushinterval"); + int32_t flag = -1; + int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag); + if (code < 0) return code; + + if (flag < 0 || flag > 100000) { + mError("dnode:%d, failed to config ttlPushInterval since value:%d. Valid range: [0, 100000]", cfgReq.dnodeId, + flag); + terrno = TSDB_CODE_INVALID_CFG; + return -1; + } + + strcpy(dcfgReq.config, "ttlpushinterval"); + snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); + } else if (strncasecmp(cfgReq.config, "ttlbatchdropnum", 15) == 0) { + int32_t optLen = strlen("ttlbatchdropnum"); + int32_t flag = -1; + int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag); + if (code < 0) return code; + + if (flag < 0) { + mError("dnode:%d, failed to config ttlBatchDropNum since value:%d. Valid range: [0, %d]", cfgReq.dnodeId, + flag, INT32_MAX); + terrno = TSDB_CODE_INVALID_CFG; + return -1; + } + + strcpy(dcfgReq.config, "ttlbatchdropnum"); + snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); #ifdef TD_ENTERPRISE } else if (strncasecmp(cfgReq.config, "activeCode", 10) == 0 || strncasecmp(cfgReq.config, "cActiveCode", 11) == 0) { int8_t opt = strncasecmp(cfgReq.config, "a", 1) == 0 ? DND_ACTIVE_CODE : DND_CONN_ACTIVE_CODE; @@ -1376,7 +1406,7 @@ static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t opLen, int32_ return 0; _err: - mError("dnode:%d, failed to config keeptimeoffset since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config); + mError("dnode:%d, failed to config since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config); terrno = TSDB_CODE_INVALID_CFG; return -1; } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index b28ad207ea..fd4ebf549f 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -119,6 +119,14 @@ static void mndPullupTtl(SMnode *pMnode) { tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } +static void mndPullupTrimDb(SMnode *pMnode) { + mTrace("pullup trim"); + int32_t contLen = 0; + void *pReq = mndBuildTimerMsg(&contLen); + SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRIM_DB_TIMER, .pCont = pReq, .contLen = contLen}; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); +} + static void mndCalMqRebalance(SMnode *pMnode) { mTrace("calc mq rebalance"); int32_t contLen = 0; @@ -255,10 +263,14 @@ static void *mndThreadFp(void *param) { if (lastTime % 10 != 0) continue; int64_t sec = lastTime / 10; - if (sec % tsTtlPushInterval == 0) { + if (sec % tsTtlPushIntervalSec == 0) { mndPullupTtl(pMnode); } + if (sec % tsTrimVDbIntervalSec == 0) { + mndPullupTrimDb(pMnode); + } + if (sec % tsTransPullupInterval == 0) { mndPullupTrans(pMnode); } @@ -661,7 +673,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { _OVER: if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || - pMsg->msgType == TDMT_MND_UPTIME_TIMER) { + pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER) { mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored, pMnode->stopped, state.restored, syncStr(state.state)); return -1; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index c3899ec433..0d41650394 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -40,10 +40,12 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew); static int32_t mndProcessTtlTimer(SRpcMsg *pReq); +static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq); static int32_t mndProcessCreateStbReq(SRpcMsg *pReq); static int32_t mndProcessAlterStbReq(SRpcMsg *pReq); static int32_t mndProcessDropStbReq(SRpcMsg *pReq); -static int32_t mndProcessDropTtltbReq(SRpcMsg *pReq); +static int32_t mndProcessDropTtltbRsp(SRpcMsg *pReq); +static int32_t mndProcessTrimDbRsp(SRpcMsg *pReq); static int32_t mndProcessTableMetaReq(SRpcMsg *pReq); static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -72,11 +74,13 @@ int32_t mndInitStb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbReq); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_VND_DROP_TTL_TABLE_RSP, mndProcessDropTtltbReq); + mndSetMsgHandle(pMnode, TDMT_VND_DROP_TTL_TABLE_RSP, mndProcessDropTtltbRsp); + mndSetMsgHandle(pMnode, TDMT_VND_TRIM_RSP, mndProcessTrimDbRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq); mndSetMsgHandle(pMnode, TDMT_MND_TTL_TIMER, mndProcessTtlTimer); + mndSetMsgHandle(pMnode, TDMT_MND_TRIM_DB_TIMER, mndProcessTrimDbTimer); mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq); // mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq); @@ -919,11 +923,12 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; void *pIter = NULL; - SVDropTtlTableReq ttlReq = {.timestampSec = taosGetTimestampSec()}; - int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq); - int32_t contLen = reqLen + sizeof(SMsgHead); + SVDropTtlTableReq ttlReq = { + .timestampSec = taosGetTimestampSec(), .ttlDropMaxCount = tsTtlBatchDropNum, .nUids = 0, .pTbUids = NULL}; + int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq); + int32_t contLen = reqLen + sizeof(SMsgHead); - mInfo("start to process ttl timer"); + mDebug("start to process ttl timer"); while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); @@ -936,7 +941,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) { } pHead->contLen = htonl(contLen); pHead->vgId = htonl(pVgroup->vgId); - tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), contLen, &ttlReq); + tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), reqLen, &ttlReq); SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen, .info = pReq->info}; SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); @@ -944,7 +949,44 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) { if (code != 0) { mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code); } else { - mInfo("vgId:%d, send drop ttl table request to vnode, time:%" PRId32, pVgroup->vgId, ttlReq.timestampSec); + mDebug("vgId:%d, send drop ttl table request to vnode, time:%" PRId32, pVgroup->vgId, ttlReq.timestampSec); + } + sdbRelease(pSdb, pVgroup); + } + + return 0; +} + +static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = NULL; + void *pIter = NULL; + SVTrimDbReq trimReq = {.timestamp = taosGetTimestampSec()}; + int32_t reqLen = tSerializeSVTrimDbReq(NULL, 0, &trimReq); + int32_t contLen = reqLen + sizeof(SMsgHead); + + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + SMsgHead *pHead = rpcMallocCont(contLen); + if (pHead == NULL) { + sdbCancelFetch(pSdb, pVgroup); + sdbRelease(pSdb, pVgroup); + continue; + } + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(pVgroup->vgId); + tSerializeSVTrimDbReq((char *)pHead + sizeof(SMsgHead), reqLen, &trimReq); + + SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen}; + SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); + int32_t code = tmsgSendReq(&epSet, &rpcMsg); + if (code != 0) { + mError("vgId:%d, timer failed to send vnode-trim request to vnode since 0x%x", pVgroup->vgId, code); + } else { + mInfo("vgId:%d, timer send vnode-trim request to vnode, time:%d", pVgroup->vgId, trimReq.timestamp); } sdbRelease(pSdb, pVgroup); } @@ -2405,7 +2447,8 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, return 0; } -static int32_t mndProcessDropTtltbReq(SRpcMsg *pRsp) { return 0; } +static int32_t mndProcessDropTtltbRsp(SRpcMsg *pRsp) { return 0; } +static int32_t mndProcessTrimDbRsp(SRpcMsg *pRsp) { return 0; } static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; diff --git a/source/dnode/vnode/src/inc/metaTtl.h b/source/dnode/vnode/src/inc/metaTtl.h index 45faceb1ea..c2cd389dab 100644 --- a/source/dnode/vnode/src/inc/metaTtl.h +++ b/source/dnode/vnode/src/inc/metaTtl.h @@ -31,15 +31,14 @@ typedef enum DirtyEntryType { } DirtyEntryType; typedef struct STtlManger { - TdThreadRwlock lock; + TTB* pOldTtlIdx; // btree<{deleteTime, tuid}, NULL> - TTB* pOldTtlIdx; // btree<{deleteTime, tuid}, NULL> - - SHashObj* pTtlCache; // key: tuid, value: {ttl, ctime} - SHashObj* pDirtyUids; // dirty tuid + SHashObj* pTtlCache; // hash + SHashObj* pDirtyUids; // hash TTB* pTtlIdx; // btree<{deleteTime, tuid}, ttl> - char* logPrefix; + char* logPrefix; + int32_t flushThreshold; // max dirty entry number in memory. if -1, flush will not be triggered by write-ops } STtlManger; typedef struct { @@ -68,23 +67,24 @@ typedef struct { typedef struct { tb_uid_t uid; int64_t changeTimeMs; + TXN* pTxn; } STtlUpdCtimeCtx; typedef struct { tb_uid_t uid; int64_t changeTimeMs; int64_t ttlDays; + TXN* pTxn; } STtlUpdTtlCtx; typedef struct { tb_uid_t uid; - TXN* pTxn; int64_t ttlDays; + TXN* pTxn; } STtlDelTtlCtx; -int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback, const char* logPrefix); +int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback, const char* logPrefix, int32_t flushThreshold); void ttlMgrClose(STtlManger* pTtlMgr); -int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta); bool ttlMgrNeedUpgrade(TDB* pEnv); int ttlMgrUpgrade(STtlManger* pTtlMgr, void* pMeta); @@ -94,7 +94,7 @@ int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx); int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx); int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn); -int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids); +int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids, int32_t ttlDropMaxCount); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 50b8e625f9..be663c2be9 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -151,9 +151,10 @@ int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid); int32_t metaTrimTables(SMeta* pMeta); -int metaTtlDropTable(SMeta* pMeta, int64_t timePointMs, SArray* tbUids); +void metaDropTables(SMeta* pMeta, SArray* tbUids); +int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); -int metaUpdateChangeTime(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs); +int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema); diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index 517d9692c7..3d445acd67 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -130,7 +130,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { // open pTtlMgr ("ttlv1.idx") char logPrefix[128] = {0}; sprintf(logPrefix, "vgId:%d", TD_VID(pVnode)); - ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0, logPrefix); + ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0, logPrefix, tsTtlFlushThreshold); if (ret < 0) { metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index f56837f759..9a298a4bb7 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -21,6 +21,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME); +static int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs); static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME); @@ -842,9 +843,11 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi return 0; } -static void metaDropTables(SMeta *pMeta, SArray *tbUids) { +void metaDropTables(SMeta *pMeta, SArray *tbUids) { + if (taosArrayGetSize(tbUids) == 0) return; + metaWLock(pMeta); - for (int i = 0; i < TARRAY_SIZE(tbUids); ++i) { + for (int i = 0; i < taosArrayGetSize(tbUids); ++i) { tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i); metaDropTableByUid(pMeta, uid, NULL); metaDebug("batch drop table:%" PRId64, uid); @@ -927,26 +930,23 @@ end: return code; } -int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) { +int metaTtlFindExpired(SMeta *pMeta, int64_t timePointMs, SArray *tbUids, int32_t ttlDropMaxCount) { + metaWLock(pMeta); int ret = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn); if (ret != 0) { metaError("ttl failed to flush, ret:%d", ret); - return ret; + goto _err; } - ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids); + ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids, ttlDropMaxCount); if (ret != 0) { metaError("ttl failed to find expired table, ret:%d", ret); - return ret; - } - if (TARRAY_SIZE(tbUids) == 0) { - return 0; + goto _err; } - metaInfo("ttl find expired table count: %zu", TARRAY_SIZE(tbUids)); - - metaDropTables(pMeta, tbUids); - return 0; +_err: + metaULock(pMeta); + return ret; } static int metaBuildBtimeIdxKey(SBtimeIdxKey *btimeKey, const SMetaEntry *pME) { @@ -1326,10 +1326,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl metaSaveToSkmDb(pMeta, &entry); - metaULock(pMeta); - metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs); + metaULock(pMeta); + metaUpdateMetaRsp(uid, pAlterTbReq->tbName, pSchema, pMetaRsp); if (entry.pBuf) taosMemoryFree(entry.pBuf); @@ -1515,10 +1515,10 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA metaUidCacheClear(pMeta, ctbEntry.ctbEntry.suid); metaTbGroupCacheClear(pMeta, ctbEntry.ctbEntry.suid); - metaULock(pMeta); - metaUpdateChangeTime(pMeta, ctbEntry.uid, pAlterTbReq->ctimeMs); + metaULock(pMeta); + tDecoderClear(&dc1); tDecoderClear(&dc2); taosMemoryFree((void *)ctbEntry.ctbEntry.pTags); @@ -1630,10 +1630,10 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p // save to table db metaSaveToTbDb(pMeta, &entry); metaUpdateUidIdx(pMeta, &entry); - metaULock(pMeta); - metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs); + metaULock(pMeta); + tdbTbcClose(pTbDbc); tdbTbcClose(pUidIdxc); tDecoderClear(&dc); @@ -1981,7 +1981,7 @@ static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) { if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0; - STtlUpdTtlCtx ctx = {.uid = pME->uid}; + STtlUpdTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn}; if (pME->type == TSDB_CHILD_TABLE) { ctx.ttlDays = pME->ctbEntry.ttlDays; ctx.changeTimeMs = pME->ctbEntry.btime; @@ -1993,7 +1993,7 @@ static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) { return ttlMgrInsertTtl(pMeta->pTtlMgr, &ctx); } -int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) { +static int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) { if (!tsTtlChangeOnWrite) return 0; if (changeTimeMs <= 0) { @@ -2001,11 +2001,20 @@ int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) { return TSDB_CODE_VERSION_NOT_COMPATIBLE; } - STtlUpdCtimeCtx ctx = {.uid = uid, .changeTimeMs = changeTimeMs}; + STtlUpdCtimeCtx ctx = {.uid = uid, .changeTimeMs = changeTimeMs, .pTxn = pMeta->txn}; return ttlMgrUpdateChangeTime(pMeta->pTtlMgr, &ctx); } +int metaUpdateChangeTimeWithLock(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) { + if (!tsTtlChangeOnWrite) return 0; + + metaWLock(pMeta); + int ret = metaUpdateChangeTime(pMeta, uid, changeTimeMs); + metaULock(pMeta); + return ret; +} + static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) { SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid}; diff --git a/source/dnode/vnode/src/meta/metaTtl.c b/source/dnode/vnode/src/meta/metaTtl.c index 45f697258c..3c45982311 100644 --- a/source/dnode/vnode/src/meta/metaTtl.c +++ b/source/dnode/vnode/src/meta/metaTtl.c @@ -21,6 +21,13 @@ typedef struct { SMeta *pMeta; } SConvertData; +typedef struct { + int32_t ttlDropMaxCount; + int32_t count; + STtlIdxKeyV1 expiredKey; + SArray *pTbUids; +} STtlExpiredCtx; + static void ttlMgrCleanup(STtlManger *pTtlMgr); static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta); @@ -31,15 +38,15 @@ static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, static int ttlMgrFillCache(STtlManger *pTtlMgr); static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pTtlCache); static int32_t ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pConvertData); +static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, + void *pExpiredInfo); -static int32_t ttlMgrWLock(STtlManger *pTtlMgr); -static int32_t ttlMgrRLock(STtlManger *pTtlMgr); -static int32_t ttlMgrULock(STtlManger *pTtlMgr); +static bool ttlMgrNeedFlush(STtlManger *pTtlMgr); const char *ttlTbname = "ttl.idx"; const char *ttlV1Tbname = "ttlv1.idx"; -int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix) { +int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix, int32_t flushThreshold) { int ret = TSDB_CODE_SUCCESS; int64_t startNs = taosGetTimestampNs(); @@ -55,6 +62,7 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *lo } strcpy(logBuffer, logPrefix); pTtlMgr->logPrefix = logBuffer; + pTtlMgr->flushThreshold = flushThreshold; ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback); if (ret < 0) { @@ -66,8 +74,6 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *lo pTtlMgr->pTtlCache = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); pTtlMgr->pDirtyUids = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - taosThreadRwlockInit(&pTtlMgr->lock, NULL); - ret = ttlMgrFillCache(pTtlMgr); if (ret < 0) { metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno)); @@ -130,6 +136,7 @@ int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) { int64_t endNs = taosGetTimestampNs(); metaInfo("%s, ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs); + _out: tdbTbClose(pTtlMgr->pOldTtlIdx); pTtlMgr->pOldTtlIdx = NULL; @@ -142,7 +149,6 @@ static void ttlMgrCleanup(STtlManger *pTtlMgr) { taosHashCleanup(pTtlMgr->pTtlCache); taosHashCleanup(pTtlMgr->pDirtyUids); tdbTbClose(pTtlMgr->pTtlIdx); - taosThreadRwlockDestroy(&pTtlMgr->lock); taosMemoryFree(pTtlMgr); } @@ -229,10 +235,25 @@ static int ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, } ret = 0; + _out: return ret; } +static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, + void *pExpiredCtx) { + STtlExpiredCtx *pCtx = (STtlExpiredCtx *)pExpiredCtx; + if (pCtx->count >= pCtx->ttlDropMaxCount) return -1; + + int c = ttlIdxKeyV1Cmpr(&pCtx->expiredKey, sizeof(pCtx->expiredKey), pKey, keyLen); + if (c > 0) { + taosArrayPush(pCtx->pTbUids, &((STtlIdxKeyV1 *)pKey)->uid); + pCtx->count++; + } + + return c; +} + static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) { SMeta *meta = pMeta; @@ -255,8 +276,6 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) { STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays, .changeTimeMs = updCtx->changeTimeMs}; STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT}; - ttlMgrWLock(pTtlMgr); - int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry)); if (ret < 0) { metaError("%s, ttlMgr insert failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno)); @@ -269,10 +288,13 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) { goto _out; } - ret = 0; -_out: - ttlMgrULock(pTtlMgr); + if (ttlMgrNeedFlush(pTtlMgr)) { + ttlMgrFlush(pTtlMgr, updCtx->pTxn); + } + ret = 0; + +_out: metaDebug("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix, updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays); @@ -281,7 +303,6 @@ _out: int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) { if (delCtx->ttlDays == 0) return 0; - ttlMgrWLock(pTtlMgr); STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DEL}; @@ -291,18 +312,19 @@ int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) { goto _out; } - ret = 0; -_out: - ttlMgrULock(pTtlMgr); + if (ttlMgrNeedFlush(pTtlMgr)) { + ttlMgrFlush(pTtlMgr, delCtx->pTxn); + } + ret = 0; + +_out: metaDebug("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid); return ret; } int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) { - ttlMgrWLock(pTtlMgr); - int ret = 0; STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid)); @@ -327,59 +349,35 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime goto _out; } - ret = 0; -_out: - ttlMgrULock(pTtlMgr); + if (ttlMgrNeedFlush(pTtlMgr)) { + ttlMgrFlush(pTtlMgr, pUpdCtimeCtx->pTxn); + } + ret = 0; + +_out: metaDebug("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid, pUpdCtimeCtx->changeTimeMs); return ret; } -int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids) { - ttlMgrRLock(pTtlMgr); +int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids, int32_t ttlDropMaxCount) { + STtlIdxKeyV1 ttlKey = {.deleteTimeMs = timePointMs, .uid = INT64_MAX}; + STtlExpiredCtx expiredCtx = { + .ttlDropMaxCount = ttlDropMaxCount, .count = 0, .expiredKey = ttlKey, .pTbUids = pTbUids}; + return tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry); +} - TBC *pCur; - int ret = tdbTbcOpen(pTtlMgr->pTtlIdx, &pCur, NULL); - if (ret < 0) { - goto _out; - } - - STtlIdxKeyV1 ttlKey = {0}; - ttlKey.deleteTimeMs = timePointMs; - ttlKey.uid = INT64_MAX; - int c = 0; - tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c); - if (c < 0) { - tdbTbcMoveToPrev(pCur); - } - - void *pKey = NULL; - int kLen = 0; - while (1) { - ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL); - if (ret < 0) { - ret = 0; - break; - } - ttlKey = *(STtlIdxKeyV1 *)pKey; - taosArrayPush(pTbUids, &ttlKey.uid); - } - - tdbFree(pKey); - tdbTbcClose(pCur); - - ret = 0; -_out: - ttlMgrULock(pTtlMgr); - return ret; +static bool ttlMgrNeedFlush(STtlManger *pTtlMgr) { + return pTtlMgr->flushThreshold > 0 && taosHashGetSize(pTtlMgr->pDirtyUids) > pTtlMgr->flushThreshold; } int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) { - ttlMgrWLock(pTtlMgr); + int64_t startNs = taosGetTimestampNs(); + int64_t endNs = startNs; - metaDebug("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids)); + metaTrace("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids)); int ret = -1; @@ -430,40 +428,10 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) { taosHashClear(pTtlMgr->pDirtyUids); ret = 0; + _out: - ttlMgrULock(pTtlMgr); - - metaDebug("%s, ttl mgr flush end.", pTtlMgr->logPrefix); - - return ret; -} - -static int32_t ttlMgrRLock(STtlManger *pTtlMgr) { - int32_t ret = 0; - - metaTrace("%s, ttlMgr rlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock); - - ret = taosThreadRwlockRdlock(&pTtlMgr->lock); - - return ret; -} - -static int32_t ttlMgrWLock(STtlManger *pTtlMgr) { - int32_t ret = 0; - - metaTrace("%s, ttlMgr wlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock); - - ret = taosThreadRwlockWrlock(&pTtlMgr->lock); - - return ret; -} - -static int32_t ttlMgrULock(STtlManger *pTtlMgr) { - int32_t ret = 0; - - metaTrace("%s, ttlMgr ulock %p", pTtlMgr->logPrefix, &pTtlMgr->lock); - - ret = taosThreadRwlockUnlock(&pTtlMgr->lock); + endNs = taosGetTimestampNs(); + metaTrace("%s, ttl mgr flush end, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix, endNs - startNs); return ret; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 52c36fcb1b..a9399e4db1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -142,6 +142,74 @@ _exit: return code; } +static int32_t vnodePreProcessDropTtlMsg(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = TSDB_CODE_INVALID_MSG; + int32_t lino = 0; + + SMsgHead *pContOld = pMsg->pCont; + int32_t reqLenOld = pMsg->contLen - sizeof(SMsgHead); + + SArray *tbUids = NULL; + int64_t timestampMs = 0; + + SVDropTtlTableReq ttlReq = {0}; + if (tDeserializeSVDropTtlTableReq((char *)pContOld + sizeof(SMsgHead), reqLenOld, &ttlReq) != 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + { // find expired uids + tbUids = taosArrayInit(8, sizeof(int64_t)); + if (tbUids == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + timestampMs = (int64_t)ttlReq.timestampSec * 1000; + code = metaTtlFindExpired(pVnode->pMeta, timestampMs, tbUids, ttlReq.ttlDropMaxCount); + if (code != 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + ttlReq.nUids = taosArrayGetSize(tbUids); + ttlReq.pTbUids = tbUids; + } + + { // prepare new content + int32_t reqLenNew = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq); + int32_t contLenNew = reqLenNew + sizeof(SMsgHead); + + SMsgHead *pContNew = rpcMallocCont(contLenNew); + if (pContNew == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + tSerializeSVDropTtlTableReq((char *)pContNew + sizeof(SMsgHead), reqLenNew, &ttlReq); + pContNew->contLen = htonl(reqLenNew); + pContNew->vgId = pContOld->vgId; + + rpcFreeCont(pContOld); + pMsg->pCont = pContNew; + pMsg->contLen = contLenNew; + } + + code = 0; + +_exit: + taosArrayDestroy(tbUids); + + if (code) { + vError("vgId:%d, %s:%d failed to preprocess drop ttl request since %s, msg type:%s", TD_VID(pVnode), __func__, lino, + tstrerror(code), TMSG_INFO(pMsg->msgType)); + } else { + vTrace("vgId:%d, %s done, timestampSec:%d, nUids:%d", TD_VID(pVnode), __func__, ttlReq.timestampSec, ttlReq.nUids); + } + + return code; +} + extern int64_t tsMaxKeyByPrecision[]; static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t btimeMs, int64_t ctimeMs) { int32_t code = 0; @@ -371,6 +439,9 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { case TDMT_VND_ALTER_TABLE: { code = vnodePreProcessAlterTableMsg(pVnode, pMsg); } break; + case TDMT_VND_DROP_TTL_TABLE: { + code = vnodePreProcessDropTtlMsg(pVnode, pMsg); + } break; case TDMT_VND_SUBMIT: { code = vnodePreProcessSubmitMsg(pVnode, pMsg); } break; @@ -405,10 +476,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg return -1; } - vDebug("vgId:%d, start to process write request %s, index:%" PRId64 ", applied:%" PRId64 - ", state.applyTerm:%" PRId64 ", conn.applyTerm:%" PRId64, - TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver, pVnode->state.applied, - pVnode->state.applyTerm, pMsg->info.conn.applyTerm); + vDebug("vgId:%d, start to process write request %s, index:%" PRId64 ", applied:%" PRId64 ", state.applyTerm:%" PRId64 + ", conn.applyTerm:%" PRId64, + TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver, pVnode->state.applied, pVnode->state.applyTerm, + pMsg->info.conn.applyTerm); ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm); ASSERTS(pVnode->state.applied + 1 == ver, "applied:%" PRId64 ", ver:%" PRId64, pVnode->state.applied, ver); @@ -727,28 +798,27 @@ _exit: } static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { - SArray *tbUids = taosArrayInit(8, sizeof(int64_t)); - if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY; - SVDropTtlTableReq ttlReq = {0}; if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; goto end; } - vDebug("vgId:%d, drop ttl table req will be processed, time:%" PRId32, pVnode->config.vgId, ttlReq.timestampSec); - int32_t ret = metaTtlDropTable(pVnode->pMeta, (int64_t)ttlReq.timestampSec * 1000, tbUids); - if (ret != 0) { - goto end; - } - if (taosArrayGetSize(tbUids) > 0) { - tqUpdateTbUidList(pVnode->pTq, tbUids, false); + ASSERT(ttlReq.nUids == taosArrayGetSize(ttlReq.pTbUids)); + + if (ttlReq.nUids != 0) { + vInfo("vgId:%d, drop ttl table req will be processed, time:%d, ntbUids:%d", pVnode->config.vgId, + ttlReq.timestampSec, ttlReq.nUids); } - vnodeDoRetention(pVnode, ttlReq.timestampSec); + int ret = 0; + if (ttlReq.nUids > 0) { + metaDropTables(pVnode->pMeta, ttlReq.pTbUids); + tqUpdateTbUidList(pVnode->pTq, ttlReq.pTbUids, false); + } end: - taosArrayDestroy(tbUids); + taosArrayDestroy(ttlReq.pTbUids); return ret; } @@ -1482,7 +1552,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in code = tsdbInsertTableData(pVnode->pTsdb, ver, pSubmitTbData, &affectedRows); if (code) goto _exit; - code = metaUpdateChangeTime(pVnode->pMeta, pSubmitTbData->uid, pSubmitTbData->ctimeMs); + code = metaUpdateChangeTimeWithLock(pVnode->pMeta, pSubmitTbData->uid, pSubmitTbData->ctimeMs); if (code) goto _exit; pSubmitRsp->affectedRows += affectedRows; @@ -1739,7 +1809,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); } - code = metaUpdateChangeTime(pVnode->pMeta, uid, deleteReq.ctimeMs); + code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, deleteReq.ctimeMs); if (code < 0) { terrno = code; vError("vgId:%d, update change time error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 @@ -1778,7 +1848,7 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid); code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey); if (code) goto _err; - code = metaUpdateChangeTime(pVnode->pMeta, uid, pRes->ctimeMs); + code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, pRes->ctimeMs); if (code) goto _err; }