Merge pull request #22526 from taosdata/enh/TD-25445-new

enh/TD-25445: ttl split flush and drop table
This commit is contained in:
wade zhang 2023-08-24 13:57:07 +08:00 committed by GitHub
commit 0860670d37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 378 additions and 174 deletions

View File

@ -130,6 +130,7 @@ extern bool tsKeepColumnName;
extern bool tsEnableQueryHb; extern bool tsEnableQueryHb;
extern bool tsEnableScience; extern bool tsEnableScience;
extern bool tsTtlChangeOnWrite; extern bool tsTtlChangeOnWrite;
extern int32_t tsTtlFlushThreshold;
extern int32_t tsRedirectPeriod; extern int32_t tsRedirectPeriod;
extern int32_t tsRedirectFactor; extern int32_t tsRedirectFactor;
extern int32_t tsRedirectMaxPeriod; extern int32_t tsRedirectMaxPeriod;
@ -186,7 +187,9 @@ extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval; extern int32_t tsMqRebalanceInterval;
extern int32_t tsStreamCheckpointTickInterval; extern int32_t tsStreamCheckpointTickInterval;
extern int32_t tsTtlUnit; extern int32_t tsTtlUnit;
extern int32_t tsTtlPushInterval; extern int32_t tsTtlPushIntervalSec;
extern int32_t tsTtlBatchDropNum;
extern int32_t tsTrimVDbIntervalSec;
extern int32_t tsGrantHBInterval; extern int32_t tsGrantHBInterval;
extern int32_t tsUptimeInterval; extern int32_t tsUptimeInterval;

View File

@ -1161,6 +1161,9 @@ int32_t tDeserializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq);
typedef struct { typedef struct {
int32_t timestampSec; int32_t timestampSec;
int32_t ttlDropMaxCount;
int32_t nUids;
SArray* pTbUids;
} SVDropTtlTableReq; } SVDropTtlTableReq;
int32_t tSerializeSVDropTtlTableReq(void* buf, int32_t bufLen, SVDropTtlTableReq* pReq); int32_t tSerializeSVDropTtlTableReq(void* buf, int32_t bufLen, SVDropTtlTableReq* pReq);

View File

@ -65,7 +65,7 @@ enum {
#define TD_NEW_MSG_SEG(TYPE) TYPE = ((TYPE##_SEG_CODE) << 8), #define TD_NEW_MSG_SEG(TYPE) TYPE = ((TYPE##_SEG_CODE) << 8),
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) TYPE, TYPE##_RSP, #define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) TYPE, TYPE##_RSP,
enum { enum { // WARN: new msg should be appended to segment tail
#endif #endif
TD_NEW_MSG_SEG(TDMT_DND_MSG) TD_NEW_MSG_SEG(TDMT_DND_MSG)
TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL)
@ -89,15 +89,15 @@ enum {
TD_NEW_MSG_SEG(TDMT_MND_MSG) TD_NEW_MSG_SEG(TDMT_MND_MSG)
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ACCT, "create-acct", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ACCT, "create-acct", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_ACCT, "alter-acct", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_ACCT, "alter-acct", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_ACCT, "drop-acct", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_ACCT, "drop-acct", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_USER, "create-user", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_USER, "create-user", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_USER, "alter-user", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_USER, "alter-user", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_USER, "drop-user", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_USER, "drop-user", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_AUTH, "get-user-auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_AUTH, "get-user-auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_DNODE, "create-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_DNODE, "create-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_DNODE, "config-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_DNODE, "config-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_DNODE, "drop-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_DNODE, "drop-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_MNODE, "create-mnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_MNODE, "create-mnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_MNODE, "alter-mnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_MNODE, "alter-mnode", NULL, NULL)
@ -182,6 +182,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
@ -296,7 +297,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)

View File

@ -124,7 +124,6 @@ int32_t tsQueryRspPolicy = 0;
int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT
bool tsEnableQueryHb = true; bool tsEnableQueryHb = true;
bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true
bool tsTtlChangeOnWrite = false; // ttl delete time changes on last write if true
int32_t tsQuerySmaOptimize = 0; int32_t tsQuerySmaOptimize = 0;
int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data. int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data.
bool tsQueryPlannerTrace = false; bool tsQueryPlannerTrace = false;
@ -226,12 +225,20 @@ bool tsStartUdfd = true;
// wal // wal
int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L); int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L);
// ttl
bool tsTtlChangeOnWrite = false; // if true, ttl delete time changes on last write
int32_t tsTtlFlushThreshold = 100; /* maximum number of dirty items in memory.
* if -1, flush will not be triggered by write-ops
*/
int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
// internal // internal
int32_t tsTransPullupInterval = 2; int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2; int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointTickInterval = 1; int32_t tsStreamCheckpointTickInterval = 1;
int32_t tsTtlUnit = 86400; int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 3600; int32_t tsTtlPushIntervalSec = 10;
int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups
int32_t tsGrantHBInterval = 60; int32_t tsGrantHBInterval = 60;
int32_t tsUptimeInterval = 300; // seconds int32_t tsUptimeInterval = 300; // seconds
char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits
@ -604,8 +611,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushIntervalSec, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlBatchDropNum", tsTtlBatchDropNum, 0, INT32_MAX, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "ttlChangeOnWrite", tsTtlChangeOnWrite, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "ttlChangeOnWrite", tsTtlChangeOnWrite, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlFlushThreshold", tsTtlFlushThreshold, -1, 1000000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "trimVDbIntervalSec", tsTrimVDbIntervalSec, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER) != 0) return -1;
@ -994,6 +1004,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval; tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval;
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval; tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval;
tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32;
tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32; tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32;
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
@ -1003,7 +1014,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32; tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32; tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32;
tsTtlPushInterval = cfgGetItem(pCfg, "ttlPushInterval")->i32; tsTtlPushIntervalSec = cfgGetItem(pCfg, "ttlPushInterval")->i32;
tsTtlBatchDropNum = cfgGetItem(pCfg, "ttlBatchDropNum")->i32;
tsTrimVDbIntervalSec = cfgGetItem(pCfg, "trimVDbIntervalSec")->i32;
tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32; tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32;
tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32; tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32;
@ -1405,13 +1418,19 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
} else if (strcasecmp("ttlUnit", name) == 0) { } else if (strcasecmp("ttlUnit", name) == 0) {
tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32; tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32;
} else if (strcasecmp("ttlPushInterval", name) == 0) { } else if (strcasecmp("ttlPushInterval", name) == 0) {
tsTtlPushInterval = cfgGetItem(pCfg, "ttlPushInterval")->i32; tsTtlPushIntervalSec = cfgGetItem(pCfg, "ttlPushInterval")->i32;
} else if (strcasecmp("ttlBatchDropNum", name) == 0) {
tsTtlBatchDropNum = cfgGetItem(pCfg, "ttlBatchDropNum")->i32;
} else if (strcasecmp("trimVDbIntervalSec", name) == 0) {
tsTrimVDbIntervalSec = cfgGetItem(pCfg, "trimVDbIntervalSec")->i32;
} else if (strcasecmp("tmrDebugFlag", name) == 0) { } else if (strcasecmp("tmrDebugFlag", name) == 0) {
tmrDebugFlag = cfgGetItem(pCfg, "tmrDebugFlag")->i32; tmrDebugFlag = cfgGetItem(pCfg, "tmrDebugFlag")->i32;
} else if (strcasecmp("tsdbDebugFlag", name) == 0) { } else if (strcasecmp("tsdbDebugFlag", name) == 0) {
tsdbDebugFlag = cfgGetItem(pCfg, "tsdbDebugFlag")->i32; tsdbDebugFlag = cfgGetItem(pCfg, "tsdbDebugFlag")->i32;
} else if (strcasecmp("tqDebugFlag", name) == 0) { } else if (strcasecmp("tqDebugFlag", name) == 0) {
tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32; tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32;
} else if (strcasecmp("ttlFlushThreshold", name) == 0) {
tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32;
} }
break; break;
} }
@ -1613,6 +1632,20 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
return; return;
} }
if (strcasecmp(option, "ttlPushInterval") == 0) {
int32_t newTtlPushInterval = atoi(value);
uInfo("ttlPushInterval set from %d to %d", tsTtlPushIntervalSec, newTtlPushInterval);
tsTtlPushIntervalSec = newTtlPushInterval;
return;
}
if (strcasecmp(option, "ttlBatchDropNum") == 0) {
int32_t newTtlBatchDropNum = atoi(value);
uInfo("ttlBatchDropNum set from %d to %d", tsTtlBatchDropNum, newTtlBatchDropNum);
tsTtlBatchDropNum = newTtlBatchDropNum;
return;
}
const char *options[] = { const char *options[] = {
"dDebugFlag", "vDebugFlag", "mDebugFlag", "wDebugFlag", "sDebugFlag", "tsdbDebugFlag", "tqDebugFlag", "dDebugFlag", "vDebugFlag", "mDebugFlag", "wDebugFlag", "sDebugFlag", "tsdbDebugFlag", "tqDebugFlag",
"fsDebugFlag", "udfDebugFlag", "smaDebugFlag", "idxDebugFlag", "tdbDebugFlag", "tmrDebugFlag", "uDebugFlag", "fsDebugFlag", "udfDebugFlag", "smaDebugFlag", "idxDebugFlag", "tdbDebugFlag", "tmrDebugFlag", "uDebugFlag",

View File

@ -3179,6 +3179,12 @@ int32_t tSerializeSVDropTtlTableReq(void *buf, int32_t bufLen, SVDropTtlTableReq
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->timestampSec) < 0) return -1; if (tEncodeI32(&encoder, pReq->timestampSec) < 0) return -1;
if (tEncodeI32(&encoder, pReq->ttlDropMaxCount) < 0) return -1;
if (tEncodeI32(&encoder, pReq->nUids) < 0) return -1;
for (int32_t i = 0; i < pReq->nUids; ++i) {
tb_uid_t *pTbUid = taosArrayGet(pReq->pTbUids, i);
if (tEncodeI64(&encoder, *pTbUid) < 0) return -1;
}
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -3192,6 +3198,30 @@ int32_t tDeserializeSVDropTtlTableReq(void *buf, int32_t bufLen, SVDropTtlTableR
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->timestampSec) < 0) return -1; if (tDecodeI32(&decoder, &pReq->timestampSec) < 0) return -1;
pReq->ttlDropMaxCount = INT32_MAX;
pReq->nUids = 0;
pReq->pTbUids = NULL;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pReq->ttlDropMaxCount) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->nUids) < 0) return -1;
if (pReq->nUids > 0) {
pReq->pTbUids = taosArrayInit(pReq->nUids, sizeof(tb_uid_t));
if (pReq->pTbUids == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
tb_uid_t tbUid = 0;
for (int32_t i = 0; i < pReq->nUids; ++i) {
if (tDecodeI64(&decoder, &tbUid) < 0) return -1;
if (taosArrayPush(pReq->pTbUids, &tbUid) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);

View File

@ -33,10 +33,10 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
return -1; return -1;
} }
SMnodeOpt option = {.deploy = true, .numOfReplicas = createReq.replica, SMnodeOpt option = {.deploy = true, .numOfReplicas = createReq.replica,
.numOfTotalReplicas = createReq.replica + createReq.learnerReplica, .numOfTotalReplicas = createReq.replica + createReq.learnerReplica,
.selfIndex = -1, .lastIndex = createReq.lastIndex}; .selfIndex = -1, .lastIndex = createReq.lastIndex};
memcpy(option.replicas, createReq.replicas, sizeof(createReq.replicas)); memcpy(option.replicas, createReq.replicas, sizeof(createReq.replicas));
for (int32_t i = 0; i < createReq.replica; ++i) { for (int32_t i = 0; i < createReq.replica; ++i) {
if (createReq.replicas[i].id == pInput->pData->dnodeId) { if (createReq.replicas[i].id == pInput->pData->dnodeId) {
@ -191,6 +191,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -1123,6 +1123,36 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
strcpy(dcfgReq.config, "keeptimeoffset"); strcpy(dcfgReq.config, "keeptimeoffset");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
} else if (strncasecmp(cfgReq.config, "ttlpushinterval", 14) == 0) {
int32_t optLen = strlen("ttlpushinterval");
int32_t flag = -1;
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
if (code < 0) return code;
if (flag < 0 || flag > 100000) {
mError("dnode:%d, failed to config ttlPushInterval since value:%d. Valid range: [0, 100000]", cfgReq.dnodeId,
flag);
terrno = TSDB_CODE_INVALID_CFG;
return -1;
}
strcpy(dcfgReq.config, "ttlpushinterval");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
} else if (strncasecmp(cfgReq.config, "ttlbatchdropnum", 15) == 0) {
int32_t optLen = strlen("ttlbatchdropnum");
int32_t flag = -1;
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
if (code < 0) return code;
if (flag < 0) {
mError("dnode:%d, failed to config ttlBatchDropNum since value:%d. Valid range: [0, %d]", cfgReq.dnodeId,
flag, INT32_MAX);
terrno = TSDB_CODE_INVALID_CFG;
return -1;
}
strcpy(dcfgReq.config, "ttlbatchdropnum");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
} else if (strncasecmp(cfgReq.config, "activeCode", 10) == 0 || strncasecmp(cfgReq.config, "cActiveCode", 11) == 0) { } else if (strncasecmp(cfgReq.config, "activeCode", 10) == 0 || strncasecmp(cfgReq.config, "cActiveCode", 11) == 0) {
int8_t opt = strncasecmp(cfgReq.config, "a", 1) == 0 ? DND_ACTIVE_CODE : DND_CONN_ACTIVE_CODE; int8_t opt = strncasecmp(cfgReq.config, "a", 1) == 0 ? DND_ACTIVE_CODE : DND_CONN_ACTIVE_CODE;
@ -1376,7 +1406,7 @@ static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t opLen, int32_
return 0; return 0;
_err: _err:
mError("dnode:%d, failed to config keeptimeoffset since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config); mError("dnode:%d, failed to config since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config);
terrno = TSDB_CODE_INVALID_CFG; terrno = TSDB_CODE_INVALID_CFG;
return -1; return -1;
} }

View File

@ -119,6 +119,14 @@ static void mndPullupTtl(SMnode *pMnode) {
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
} }
static void mndPullupTrimDb(SMnode *pMnode) {
mTrace("pullup trim");
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRIM_DB_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
static void mndCalMqRebalance(SMnode *pMnode) { static void mndCalMqRebalance(SMnode *pMnode) {
mTrace("calc mq rebalance"); mTrace("calc mq rebalance");
int32_t contLen = 0; int32_t contLen = 0;
@ -255,10 +263,14 @@ static void *mndThreadFp(void *param) {
if (lastTime % 10 != 0) continue; if (lastTime % 10 != 0) continue;
int64_t sec = lastTime / 10; int64_t sec = lastTime / 10;
if (sec % tsTtlPushInterval == 0) { if (sec % tsTtlPushIntervalSec == 0) {
mndPullupTtl(pMnode); mndPullupTtl(pMnode);
} }
if (sec % tsTrimVDbIntervalSec == 0) {
mndPullupTrimDb(pMnode);
}
if (sec % tsTransPullupInterval == 0) { if (sec % tsTransPullupInterval == 0) {
mndPullupTrans(pMnode); mndPullupTrans(pMnode);
} }
@ -661,7 +673,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
_OVER: _OVER:
if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
pMsg->msgType == TDMT_MND_UPTIME_TIMER) { pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored, mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
pMnode->stopped, state.restored, syncStr(state.state)); pMnode->stopped, state.restored, syncStr(state.state));
return -1; return -1;

View File

@ -40,10 +40,12 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew); static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
static int32_t mndProcessTtlTimer(SRpcMsg *pReq); static int32_t mndProcessTtlTimer(SRpcMsg *pReq);
static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq);
static int32_t mndProcessCreateStbReq(SRpcMsg *pReq); static int32_t mndProcessCreateStbReq(SRpcMsg *pReq);
static int32_t mndProcessAlterStbReq(SRpcMsg *pReq); static int32_t mndProcessAlterStbReq(SRpcMsg *pReq);
static int32_t mndProcessDropStbReq(SRpcMsg *pReq); static int32_t mndProcessDropStbReq(SRpcMsg *pReq);
static int32_t mndProcessDropTtltbReq(SRpcMsg *pReq); static int32_t mndProcessDropTtltbRsp(SRpcMsg *pReq);
static int32_t mndProcessTrimDbRsp(SRpcMsg *pReq);
static int32_t mndProcessTableMetaReq(SRpcMsg *pReq); static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
@ -72,11 +74,13 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbReq); mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TTL_TABLE_RSP, mndProcessDropTtltbReq); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TTL_TABLE_RSP, mndProcessDropTtltbRsp);
mndSetMsgHandle(pMnode, TDMT_VND_TRIM_RSP, mndProcessTrimDbRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq); mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
mndSetMsgHandle(pMnode, TDMT_MND_TTL_TIMER, mndProcessTtlTimer); mndSetMsgHandle(pMnode, TDMT_MND_TTL_TIMER, mndProcessTtlTimer);
mndSetMsgHandle(pMnode, TDMT_MND_TRIM_DB_TIMER, mndProcessTrimDbTimer);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq); mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq);
// mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq); // mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq);
@ -919,11 +923,12 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
void *pIter = NULL; void *pIter = NULL;
SVDropTtlTableReq ttlReq = {.timestampSec = taosGetTimestampSec()}; SVDropTtlTableReq ttlReq = {
int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq); .timestampSec = taosGetTimestampSec(), .ttlDropMaxCount = tsTtlBatchDropNum, .nUids = 0, .pTbUids = NULL};
int32_t contLen = reqLen + sizeof(SMsgHead); int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq);
int32_t contLen = reqLen + sizeof(SMsgHead);
mInfo("start to process ttl timer"); mDebug("start to process ttl timer");
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
@ -936,7 +941,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
} }
pHead->contLen = htonl(contLen); pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId); pHead->vgId = htonl(pVgroup->vgId);
tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), contLen, &ttlReq); tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), reqLen, &ttlReq);
SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen, .info = pReq->info}; SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen, .info = pReq->info};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
@ -944,7 +949,44 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
if (code != 0) { if (code != 0) {
mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code); mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code);
} else { } else {
mInfo("vgId:%d, send drop ttl table request to vnode, time:%" PRId32, pVgroup->vgId, ttlReq.timestampSec); mDebug("vgId:%d, send drop ttl table request to vnode, time:%" PRId32, pVgroup->vgId, ttlReq.timestampSec);
}
sdbRelease(pSdb, pVgroup);
}
return 0;
}
static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
SVTrimDbReq trimReq = {.timestamp = taosGetTimestampSec()};
int32_t reqLen = tSerializeSVTrimDbReq(NULL, 0, &trimReq);
int32_t contLen = reqLen + sizeof(SMsgHead);
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
SMsgHead *pHead = rpcMallocCont(contLen);
if (pHead == NULL) {
sdbCancelFetch(pSdb, pVgroup);
sdbRelease(pSdb, pVgroup);
continue;
}
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
tSerializeSVTrimDbReq((char *)pHead + sizeof(SMsgHead), reqLen, &trimReq);
SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
if (code != 0) {
mError("vgId:%d, timer failed to send vnode-trim request to vnode since 0x%x", pVgroup->vgId, code);
} else {
mInfo("vgId:%d, timer send vnode-trim request to vnode, time:%d", pVgroup->vgId, trimReq.timestamp);
} }
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
@ -2405,7 +2447,8 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
return 0; return 0;
} }
static int32_t mndProcessDropTtltbReq(SRpcMsg *pRsp) { return 0; } static int32_t mndProcessDropTtltbRsp(SRpcMsg *pRsp) { return 0; }
static int32_t mndProcessTrimDbRsp(SRpcMsg *pRsp) { return 0; }
static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;

View File

@ -31,15 +31,14 @@ typedef enum DirtyEntryType {
} DirtyEntryType; } DirtyEntryType;
typedef struct STtlManger { typedef struct STtlManger {
TdThreadRwlock lock; TTB* pOldTtlIdx; // btree<{deleteTime, tuid}, NULL>
TTB* pOldTtlIdx; // btree<{deleteTime, tuid}, NULL> SHashObj* pTtlCache; // hash<tuid, {ttl, ctime}>
SHashObj* pDirtyUids; // hash<dirtyTuid, entryType>
SHashObj* pTtlCache; // key: tuid, value: {ttl, ctime}
SHashObj* pDirtyUids; // dirty tuid
TTB* pTtlIdx; // btree<{deleteTime, tuid}, ttl> TTB* pTtlIdx; // btree<{deleteTime, tuid}, ttl>
char* logPrefix; char* logPrefix;
int32_t flushThreshold; // max dirty entry number in memory. if -1, flush will not be triggered by write-ops
} STtlManger; } STtlManger;
typedef struct { typedef struct {
@ -68,23 +67,24 @@ typedef struct {
typedef struct { typedef struct {
tb_uid_t uid; tb_uid_t uid;
int64_t changeTimeMs; int64_t changeTimeMs;
TXN* pTxn;
} STtlUpdCtimeCtx; } STtlUpdCtimeCtx;
typedef struct { typedef struct {
tb_uid_t uid; tb_uid_t uid;
int64_t changeTimeMs; int64_t changeTimeMs;
int64_t ttlDays; int64_t ttlDays;
TXN* pTxn;
} STtlUpdTtlCtx; } STtlUpdTtlCtx;
typedef struct { typedef struct {
tb_uid_t uid; tb_uid_t uid;
TXN* pTxn;
int64_t ttlDays; int64_t ttlDays;
TXN* pTxn;
} STtlDelTtlCtx; } STtlDelTtlCtx;
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback, const char* logPrefix); int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback, const char* logPrefix, int32_t flushThreshold);
void ttlMgrClose(STtlManger* pTtlMgr); void ttlMgrClose(STtlManger* pTtlMgr);
int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta);
bool ttlMgrNeedUpgrade(TDB* pEnv); bool ttlMgrNeedUpgrade(TDB* pEnv);
int ttlMgrUpgrade(STtlManger* pTtlMgr, void* pMeta); int ttlMgrUpgrade(STtlManger* pTtlMgr, void* pMeta);
@ -94,7 +94,7 @@ int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx);
int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx); int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx);
int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn); int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn);
int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids); int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids, int32_t ttlDropMaxCount);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -151,9 +151,10 @@ int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
int32_t metaTrimTables(SMeta* pMeta); int32_t metaTrimTables(SMeta* pMeta);
int metaTtlDropTable(SMeta* pMeta, int64_t timePointMs, SArray* tbUids); void metaDropTables(SMeta* pMeta, SArray* tbUids);
int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
int metaUpdateChangeTime(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs); int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema); int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema);

View File

@ -130,7 +130,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
// open pTtlMgr ("ttlv1.idx") // open pTtlMgr ("ttlv1.idx")
char logPrefix[128] = {0}; char logPrefix[128] = {0};
sprintf(logPrefix, "vgId:%d", TD_VID(pVnode)); sprintf(logPrefix, "vgId:%d", TD_VID(pVnode));
ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0, logPrefix); ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0, logPrefix, tsTtlFlushThreshold);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;

View File

@ -21,6 +21,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs);
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME); static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME);
@ -842,9 +843,11 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
return 0; return 0;
} }
static void metaDropTables(SMeta *pMeta, SArray *tbUids) { void metaDropTables(SMeta *pMeta, SArray *tbUids) {
if (taosArrayGetSize(tbUids) == 0) return;
metaWLock(pMeta); metaWLock(pMeta);
for (int i = 0; i < TARRAY_SIZE(tbUids); ++i) { for (int i = 0; i < taosArrayGetSize(tbUids); ++i) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i); tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i);
metaDropTableByUid(pMeta, uid, NULL); metaDropTableByUid(pMeta, uid, NULL);
metaDebug("batch drop table:%" PRId64, uid); metaDebug("batch drop table:%" PRId64, uid);
@ -927,26 +930,23 @@ end:
return code; return code;
} }
int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) { int metaTtlFindExpired(SMeta *pMeta, int64_t timePointMs, SArray *tbUids, int32_t ttlDropMaxCount) {
metaWLock(pMeta);
int ret = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn); int ret = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
if (ret != 0) { if (ret != 0) {
metaError("ttl failed to flush, ret:%d", ret); metaError("ttl failed to flush, ret:%d", ret);
return ret; goto _err;
} }
ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids); ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids, ttlDropMaxCount);
if (ret != 0) { if (ret != 0) {
metaError("ttl failed to find expired table, ret:%d", ret); metaError("ttl failed to find expired table, ret:%d", ret);
return ret; goto _err;
}
if (TARRAY_SIZE(tbUids) == 0) {
return 0;
} }
metaInfo("ttl find expired table count: %zu", TARRAY_SIZE(tbUids)); _err:
metaULock(pMeta);
metaDropTables(pMeta, tbUids); return ret;
return 0;
} }
static int metaBuildBtimeIdxKey(SBtimeIdxKey *btimeKey, const SMetaEntry *pME) { static int metaBuildBtimeIdxKey(SBtimeIdxKey *btimeKey, const SMetaEntry *pME) {
@ -1326,10 +1326,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
metaSaveToSkmDb(pMeta, &entry); metaSaveToSkmDb(pMeta, &entry);
metaULock(pMeta);
metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs); metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs);
metaULock(pMeta);
metaUpdateMetaRsp(uid, pAlterTbReq->tbName, pSchema, pMetaRsp); metaUpdateMetaRsp(uid, pAlterTbReq->tbName, pSchema, pMetaRsp);
if (entry.pBuf) taosMemoryFree(entry.pBuf); if (entry.pBuf) taosMemoryFree(entry.pBuf);
@ -1515,10 +1515,10 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaUidCacheClear(pMeta, ctbEntry.ctbEntry.suid); metaUidCacheClear(pMeta, ctbEntry.ctbEntry.suid);
metaTbGroupCacheClear(pMeta, ctbEntry.ctbEntry.suid); metaTbGroupCacheClear(pMeta, ctbEntry.ctbEntry.suid);
metaULock(pMeta);
metaUpdateChangeTime(pMeta, ctbEntry.uid, pAlterTbReq->ctimeMs); metaUpdateChangeTime(pMeta, ctbEntry.uid, pAlterTbReq->ctimeMs);
metaULock(pMeta);
tDecoderClear(&dc1); tDecoderClear(&dc1);
tDecoderClear(&dc2); tDecoderClear(&dc2);
taosMemoryFree((void *)ctbEntry.ctbEntry.pTags); taosMemoryFree((void *)ctbEntry.ctbEntry.pTags);
@ -1630,10 +1630,10 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
// save to table db // save to table db
metaSaveToTbDb(pMeta, &entry); metaSaveToTbDb(pMeta, &entry);
metaUpdateUidIdx(pMeta, &entry); metaUpdateUidIdx(pMeta, &entry);
metaULock(pMeta);
metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs); metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs);
metaULock(pMeta);
tdbTbcClose(pTbDbc); tdbTbcClose(pTbDbc);
tdbTbcClose(pUidIdxc); tdbTbcClose(pUidIdxc);
tDecoderClear(&dc); tDecoderClear(&dc);
@ -1981,7 +1981,7 @@ static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) {
if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0; if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0;
STtlUpdTtlCtx ctx = {.uid = pME->uid}; STtlUpdTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn};
if (pME->type == TSDB_CHILD_TABLE) { if (pME->type == TSDB_CHILD_TABLE) {
ctx.ttlDays = pME->ctbEntry.ttlDays; ctx.ttlDays = pME->ctbEntry.ttlDays;
ctx.changeTimeMs = pME->ctbEntry.btime; ctx.changeTimeMs = pME->ctbEntry.btime;
@ -1993,7 +1993,7 @@ static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) {
return ttlMgrInsertTtl(pMeta->pTtlMgr, &ctx); return ttlMgrInsertTtl(pMeta->pTtlMgr, &ctx);
} }
int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) { static int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) {
if (!tsTtlChangeOnWrite) return 0; if (!tsTtlChangeOnWrite) return 0;
if (changeTimeMs <= 0) { if (changeTimeMs <= 0) {
@ -2001,11 +2001,20 @@ int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) {
return TSDB_CODE_VERSION_NOT_COMPATIBLE; return TSDB_CODE_VERSION_NOT_COMPATIBLE;
} }
STtlUpdCtimeCtx ctx = {.uid = uid, .changeTimeMs = changeTimeMs}; STtlUpdCtimeCtx ctx = {.uid = uid, .changeTimeMs = changeTimeMs, .pTxn = pMeta->txn};
return ttlMgrUpdateChangeTime(pMeta->pTtlMgr, &ctx); return ttlMgrUpdateChangeTime(pMeta->pTtlMgr, &ctx);
} }
int metaUpdateChangeTimeWithLock(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) {
if (!tsTtlChangeOnWrite) return 0;
metaWLock(pMeta);
int ret = metaUpdateChangeTime(pMeta, uid, changeTimeMs);
metaULock(pMeta);
return ret;
}
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid}; SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid};

View File

@ -21,6 +21,13 @@ typedef struct {
SMeta *pMeta; SMeta *pMeta;
} SConvertData; } SConvertData;
typedef struct {
int32_t ttlDropMaxCount;
int32_t count;
STtlIdxKeyV1 expiredKey;
SArray *pTbUids;
} STtlExpiredCtx;
static void ttlMgrCleanup(STtlManger *pTtlMgr); static void ttlMgrCleanup(STtlManger *pTtlMgr);
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta); static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta);
@ -31,15 +38,15 @@ static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2,
static int ttlMgrFillCache(STtlManger *pTtlMgr); static int ttlMgrFillCache(STtlManger *pTtlMgr);
static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pTtlCache); static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pTtlCache);
static int32_t ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pConvertData); static int32_t ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pConvertData);
static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen,
void *pExpiredInfo);
static int32_t ttlMgrWLock(STtlManger *pTtlMgr); static bool ttlMgrNeedFlush(STtlManger *pTtlMgr);
static int32_t ttlMgrRLock(STtlManger *pTtlMgr);
static int32_t ttlMgrULock(STtlManger *pTtlMgr);
const char *ttlTbname = "ttl.idx"; const char *ttlTbname = "ttl.idx";
const char *ttlV1Tbname = "ttlv1.idx"; const char *ttlV1Tbname = "ttlv1.idx";
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix) { int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix, int32_t flushThreshold) {
int ret = TSDB_CODE_SUCCESS; int ret = TSDB_CODE_SUCCESS;
int64_t startNs = taosGetTimestampNs(); int64_t startNs = taosGetTimestampNs();
@ -55,6 +62,7 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *lo
} }
strcpy(logBuffer, logPrefix); strcpy(logBuffer, logPrefix);
pTtlMgr->logPrefix = logBuffer; pTtlMgr->logPrefix = logBuffer;
pTtlMgr->flushThreshold = flushThreshold;
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback); ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
if (ret < 0) { if (ret < 0) {
@ -66,8 +74,6 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *lo
pTtlMgr->pTtlCache = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); pTtlMgr->pTtlCache = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
pTtlMgr->pDirtyUids = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); pTtlMgr->pDirtyUids = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
taosThreadRwlockInit(&pTtlMgr->lock, NULL);
ret = ttlMgrFillCache(pTtlMgr); ret = ttlMgrFillCache(pTtlMgr);
if (ret < 0) { if (ret < 0) {
metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno)); metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
@ -130,6 +136,7 @@ int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
int64_t endNs = taosGetTimestampNs(); int64_t endNs = taosGetTimestampNs();
metaInfo("%s, ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix, metaInfo("%s, ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs); taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
_out: _out:
tdbTbClose(pTtlMgr->pOldTtlIdx); tdbTbClose(pTtlMgr->pOldTtlIdx);
pTtlMgr->pOldTtlIdx = NULL; pTtlMgr->pOldTtlIdx = NULL;
@ -142,7 +149,6 @@ static void ttlMgrCleanup(STtlManger *pTtlMgr) {
taosHashCleanup(pTtlMgr->pTtlCache); taosHashCleanup(pTtlMgr->pTtlCache);
taosHashCleanup(pTtlMgr->pDirtyUids); taosHashCleanup(pTtlMgr->pDirtyUids);
tdbTbClose(pTtlMgr->pTtlIdx); tdbTbClose(pTtlMgr->pTtlIdx);
taosThreadRwlockDestroy(&pTtlMgr->lock);
taosMemoryFree(pTtlMgr); taosMemoryFree(pTtlMgr);
} }
@ -229,10 +235,25 @@ static int ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal,
} }
ret = 0; ret = 0;
_out: _out:
return ret; return ret;
} }
static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen,
void *pExpiredCtx) {
STtlExpiredCtx *pCtx = (STtlExpiredCtx *)pExpiredCtx;
if (pCtx->count >= pCtx->ttlDropMaxCount) return -1;
int c = ttlIdxKeyV1Cmpr(&pCtx->expiredKey, sizeof(pCtx->expiredKey), pKey, keyLen);
if (c > 0) {
taosArrayPush(pCtx->pTbUids, &((STtlIdxKeyV1 *)pKey)->uid);
pCtx->count++;
}
return c;
}
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) { static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
SMeta *meta = pMeta; SMeta *meta = pMeta;
@ -255,8 +276,6 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays, .changeTimeMs = updCtx->changeTimeMs}; STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays, .changeTimeMs = updCtx->changeTimeMs};
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT}; STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
ttlMgrWLock(pTtlMgr);
int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry)); int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
if (ret < 0) { if (ret < 0) {
metaError("%s, ttlMgr insert failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno)); metaError("%s, ttlMgr insert failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
@ -269,10 +288,13 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
goto _out; goto _out;
} }
ret = 0; if (ttlMgrNeedFlush(pTtlMgr)) {
_out: ttlMgrFlush(pTtlMgr, updCtx->pTxn);
ttlMgrULock(pTtlMgr); }
ret = 0;
_out:
metaDebug("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix, metaDebug("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays); updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays);
@ -281,7 +303,6 @@ _out:
int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) { int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
if (delCtx->ttlDays == 0) return 0; if (delCtx->ttlDays == 0) return 0;
ttlMgrWLock(pTtlMgr);
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DEL}; STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DEL};
@ -291,18 +312,19 @@ int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
goto _out; goto _out;
} }
ret = 0; if (ttlMgrNeedFlush(pTtlMgr)) {
_out: ttlMgrFlush(pTtlMgr, delCtx->pTxn);
ttlMgrULock(pTtlMgr); }
ret = 0;
_out:
metaDebug("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid); metaDebug("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
return ret; return ret;
} }
int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) { int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) {
ttlMgrWLock(pTtlMgr);
int ret = 0; int ret = 0;
STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid)); STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid));
@ -327,59 +349,35 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
goto _out; goto _out;
} }
ret = 0; if (ttlMgrNeedFlush(pTtlMgr)) {
_out: ttlMgrFlush(pTtlMgr, pUpdCtimeCtx->pTxn);
ttlMgrULock(pTtlMgr); }
ret = 0;
_out:
metaDebug("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid, metaDebug("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
pUpdCtimeCtx->changeTimeMs); pUpdCtimeCtx->changeTimeMs);
return ret; return ret;
} }
int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids) { int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids, int32_t ttlDropMaxCount) {
ttlMgrRLock(pTtlMgr); STtlIdxKeyV1 ttlKey = {.deleteTimeMs = timePointMs, .uid = INT64_MAX};
STtlExpiredCtx expiredCtx = {
.ttlDropMaxCount = ttlDropMaxCount, .count = 0, .expiredKey = ttlKey, .pTbUids = pTbUids};
return tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry);
}
TBC *pCur; static bool ttlMgrNeedFlush(STtlManger *pTtlMgr) {
int ret = tdbTbcOpen(pTtlMgr->pTtlIdx, &pCur, NULL); return pTtlMgr->flushThreshold > 0 && taosHashGetSize(pTtlMgr->pDirtyUids) > pTtlMgr->flushThreshold;
if (ret < 0) {
goto _out;
}
STtlIdxKeyV1 ttlKey = {0};
ttlKey.deleteTimeMs = timePointMs;
ttlKey.uid = INT64_MAX;
int c = 0;
tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c);
if (c < 0) {
tdbTbcMoveToPrev(pCur);
}
void *pKey = NULL;
int kLen = 0;
while (1) {
ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL);
if (ret < 0) {
ret = 0;
break;
}
ttlKey = *(STtlIdxKeyV1 *)pKey;
taosArrayPush(pTbUids, &ttlKey.uid);
}
tdbFree(pKey);
tdbTbcClose(pCur);
ret = 0;
_out:
ttlMgrULock(pTtlMgr);
return ret;
} }
int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) { int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
ttlMgrWLock(pTtlMgr); int64_t startNs = taosGetTimestampNs();
int64_t endNs = startNs;
metaDebug("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids)); metaTrace("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids));
int ret = -1; int ret = -1;
@ -430,40 +428,10 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
taosHashClear(pTtlMgr->pDirtyUids); taosHashClear(pTtlMgr->pDirtyUids);
ret = 0; ret = 0;
_out: _out:
ttlMgrULock(pTtlMgr); endNs = taosGetTimestampNs();
metaTrace("%s, ttl mgr flush end, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix, endNs - startNs);
metaDebug("%s, ttl mgr flush end.", pTtlMgr->logPrefix);
return ret;
}
static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
int32_t ret = 0;
metaTrace("%s, ttlMgr rlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
ret = taosThreadRwlockRdlock(&pTtlMgr->lock);
return ret;
}
static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
int32_t ret = 0;
metaTrace("%s, ttlMgr wlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
ret = taosThreadRwlockWrlock(&pTtlMgr->lock);
return ret;
}
static int32_t ttlMgrULock(STtlManger *pTtlMgr) {
int32_t ret = 0;
metaTrace("%s, ttlMgr ulock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
ret = taosThreadRwlockUnlock(&pTtlMgr->lock);
return ret; return ret;
} }

View File

@ -142,6 +142,74 @@ _exit:
return code; return code;
} }
static int32_t vnodePreProcessDropTtlMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_INVALID_MSG;
int32_t lino = 0;
SMsgHead *pContOld = pMsg->pCont;
int32_t reqLenOld = pMsg->contLen - sizeof(SMsgHead);
SArray *tbUids = NULL;
int64_t timestampMs = 0;
SVDropTtlTableReq ttlReq = {0};
if (tDeserializeSVDropTtlTableReq((char *)pContOld + sizeof(SMsgHead), reqLenOld, &ttlReq) != 0) {
code = TSDB_CODE_INVALID_MSG;
TSDB_CHECK_CODE(code, lino, _exit);
}
{ // find expired uids
tbUids = taosArrayInit(8, sizeof(int64_t));
if (tbUids == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
timestampMs = (int64_t)ttlReq.timestampSec * 1000;
code = metaTtlFindExpired(pVnode->pMeta, timestampMs, tbUids, ttlReq.ttlDropMaxCount);
if (code != 0) {
code = TSDB_CODE_INVALID_MSG;
TSDB_CHECK_CODE(code, lino, _exit);
}
ttlReq.nUids = taosArrayGetSize(tbUids);
ttlReq.pTbUids = tbUids;
}
{ // prepare new content
int32_t reqLenNew = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq);
int32_t contLenNew = reqLenNew + sizeof(SMsgHead);
SMsgHead *pContNew = rpcMallocCont(contLenNew);
if (pContNew == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
tSerializeSVDropTtlTableReq((char *)pContNew + sizeof(SMsgHead), reqLenNew, &ttlReq);
pContNew->contLen = htonl(reqLenNew);
pContNew->vgId = pContOld->vgId;
rpcFreeCont(pContOld);
pMsg->pCont = pContNew;
pMsg->contLen = contLenNew;
}
code = 0;
_exit:
taosArrayDestroy(tbUids);
if (code) {
vError("vgId:%d, %s:%d failed to preprocess drop ttl request since %s, msg type:%s", TD_VID(pVnode), __func__, lino,
tstrerror(code), TMSG_INFO(pMsg->msgType));
} else {
vTrace("vgId:%d, %s done, timestampSec:%d, nUids:%d", TD_VID(pVnode), __func__, ttlReq.timestampSec, ttlReq.nUids);
}
return code;
}
extern int64_t tsMaxKeyByPrecision[]; extern int64_t tsMaxKeyByPrecision[];
static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t btimeMs, int64_t ctimeMs) { static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t btimeMs, int64_t ctimeMs) {
int32_t code = 0; int32_t code = 0;
@ -371,6 +439,9 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
case TDMT_VND_ALTER_TABLE: { case TDMT_VND_ALTER_TABLE: {
code = vnodePreProcessAlterTableMsg(pVnode, pMsg); code = vnodePreProcessAlterTableMsg(pVnode, pMsg);
} break; } break;
case TDMT_VND_DROP_TTL_TABLE: {
code = vnodePreProcessDropTtlMsg(pVnode, pMsg);
} break;
case TDMT_VND_SUBMIT: { case TDMT_VND_SUBMIT: {
code = vnodePreProcessSubmitMsg(pVnode, pMsg); code = vnodePreProcessSubmitMsg(pVnode, pMsg);
} break; } break;
@ -405,10 +476,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
return -1; return -1;
} }
vDebug("vgId:%d, start to process write request %s, index:%" PRId64 ", applied:%" PRId64 vDebug("vgId:%d, start to process write request %s, index:%" PRId64 ", applied:%" PRId64 ", state.applyTerm:%" PRId64
", state.applyTerm:%" PRId64 ", conn.applyTerm:%" PRId64, ", conn.applyTerm:%" PRId64,
TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver, pVnode->state.applied, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver, pVnode->state.applied, pVnode->state.applyTerm,
pVnode->state.applyTerm, pMsg->info.conn.applyTerm); pMsg->info.conn.applyTerm);
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm); ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
ASSERTS(pVnode->state.applied + 1 == ver, "applied:%" PRId64 ", ver:%" PRId64, pVnode->state.applied, ver); ASSERTS(pVnode->state.applied + 1 == ver, "applied:%" PRId64 ", ver:%" PRId64, pVnode->state.applied, ver);
@ -727,28 +798,27 @@ _exit:
} }
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;
SVDropTtlTableReq ttlReq = {0}; SVDropTtlTableReq ttlReq = {0};
if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) { if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto end; goto end;
} }
vDebug("vgId:%d, drop ttl table req will be processed, time:%" PRId32, pVnode->config.vgId, ttlReq.timestampSec); ASSERT(ttlReq.nUids == taosArrayGetSize(ttlReq.pTbUids));
int32_t ret = metaTtlDropTable(pVnode->pMeta, (int64_t)ttlReq.timestampSec * 1000, tbUids);
if (ret != 0) { if (ttlReq.nUids != 0) {
goto end; vInfo("vgId:%d, drop ttl table req will be processed, time:%d, ntbUids:%d", pVnode->config.vgId,
} ttlReq.timestampSec, ttlReq.nUids);
if (taosArrayGetSize(tbUids) > 0) {
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
} }
vnodeDoRetention(pVnode, ttlReq.timestampSec); int ret = 0;
if (ttlReq.nUids > 0) {
metaDropTables(pVnode->pMeta, ttlReq.pTbUids);
tqUpdateTbUidList(pVnode->pTq, ttlReq.pTbUids, false);
}
end: end:
taosArrayDestroy(tbUids); taosArrayDestroy(ttlReq.pTbUids);
return ret; return ret;
} }
@ -1482,7 +1552,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
code = tsdbInsertTableData(pVnode->pTsdb, ver, pSubmitTbData, &affectedRows); code = tsdbInsertTableData(pVnode->pTsdb, ver, pSubmitTbData, &affectedRows);
if (code) goto _exit; if (code) goto _exit;
code = metaUpdateChangeTime(pVnode->pMeta, pSubmitTbData->uid, pSubmitTbData->ctimeMs); code = metaUpdateChangeTimeWithLock(pVnode->pMeta, pSubmitTbData->uid, pSubmitTbData->ctimeMs);
if (code) goto _exit; if (code) goto _exit;
pSubmitRsp->affectedRows += affectedRows; pSubmitRsp->affectedRows += affectedRows;
@ -1739,7 +1809,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
} }
code = metaUpdateChangeTime(pVnode->pMeta, uid, deleteReq.ctimeMs); code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, deleteReq.ctimeMs);
if (code < 0) { if (code < 0) {
terrno = code; terrno = code;
vError("vgId:%d, update change time error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 vError("vgId:%d, update change time error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64
@ -1778,7 +1848,7 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in
uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid); uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid);
code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey); code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey);
if (code) goto _err; if (code) goto _err;
code = metaUpdateChangeTime(pVnode->pMeta, uid, pRes->ctimeMs); code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, pRes->ctimeMs);
if (code) goto _err; if (code) goto _err;
} }