Merge pull request #22770 from taosdata/fix/TD-25445-main

fix/TD-25445: search memory in ttlMgrFindExpired
This commit is contained in:
wade zhang 2023-09-07 16:13:32 +08:00 committed by GitHub
commit d3f0c5ac3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 138 additions and 16 deletions

View File

@ -52,7 +52,9 @@ int metaFinishCommit(SMeta *pMeta, TXN *txn) { return tdbPostCommit(pMeta->pEnv
int metaPrepareAsyncCommit(SMeta *pMeta) { int metaPrepareAsyncCommit(SMeta *pMeta) {
// return tdbPrepareAsyncCommit(pMeta->pEnv, pMeta->txn); // return tdbPrepareAsyncCommit(pMeta->pEnv, pMeta->txn);
int code = 0; int code = 0;
metaWLock(pMeta);
code = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn); code = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
metaULock(pMeta);
code = tdbCommit(pMeta->pEnv, pMeta->txn); code = tdbCommit(pMeta->pEnv, pMeta->txn);
return code; return code;

View File

@ -931,21 +931,16 @@ end:
} }
int metaTtlFindExpired(SMeta *pMeta, int64_t timePointMs, SArray *tbUids, int32_t ttlDropMaxCount) { int metaTtlFindExpired(SMeta *pMeta, int64_t timePointMs, SArray *tbUids, int32_t ttlDropMaxCount) {
metaWLock(pMeta); metaRLock(pMeta);
int ret = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
if (ret != 0) { int ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids, ttlDropMaxCount);
metaError("ttl failed to flush, ret:%d", ret);
goto _err; metaULock(pMeta);
}
ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids, ttlDropMaxCount);
if (ret != 0) { if (ret != 0) {
metaError("ttl failed to find expired table, ret:%d", ret); metaError("ttl failed to find expired table, ret:%d", ret);
goto _err;
} }
_err:
metaULock(pMeta);
return ret; return ret;
} }

View File

@ -299,7 +299,7 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
ret = 0; ret = 0;
_out: _out:
metaDebug("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix, metaTrace("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays); updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays);
return ret; return ret;
@ -323,7 +323,7 @@ int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
ret = 0; ret = 0;
_out: _out:
metaDebug("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid); metaTrace("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
return ret; return ret;
} }
@ -363,17 +363,37 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
ret = 0; ret = 0;
_out: _out:
metaDebug("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid, metaTrace("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
pUpdCtimeCtx->changeTimeMs); pUpdCtimeCtx->changeTimeMs);
return ret; return ret;
} }
int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids, int32_t ttlDropMaxCount) { int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids, int32_t ttlDropMaxCount) {
int ret = -1;
STtlIdxKeyV1 ttlKey = {.deleteTimeMs = timePointMs, .uid = INT64_MAX}; STtlIdxKeyV1 ttlKey = {.deleteTimeMs = timePointMs, .uid = INT64_MAX};
STtlExpiredCtx expiredCtx = { STtlExpiredCtx expiredCtx = {
.ttlDropMaxCount = ttlDropMaxCount, .count = 0, .expiredKey = ttlKey, .pTbUids = pTbUids}; .ttlDropMaxCount = ttlDropMaxCount, .count = 0, .expiredKey = ttlKey, .pTbUids = pTbUids};
return tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry); ret = tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry);
if (ret) {
goto _out;
}
size_t vIdx = 0;
for (size_t i = 0; i < pTbUids->size; i++) {
tb_uid_t *pUid = taosArrayGet(pTbUids, i);
if (taosHashGet(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t)) == NULL) {
// not in dirty && expired in tdb => must be expired
taosArraySet(pTbUids, vIdx, pUid);
vIdx++;
}
}
taosArrayPopTailBatch(pTbUids, pTbUids->size - vIdx);
_out:
return ret;
} }
static bool ttlMgrNeedFlush(STtlManger *pTtlMgr) { static bool ttlMgrNeedFlush(STtlManger *pTtlMgr) {

View File

@ -160,7 +160,7 @@ static int32_t vnodePreProcessDropTtlMsg(SVnode *pVnode, SRpcMsg *pMsg) {
} }
{ // find expired uids { // find expired uids
tbUids = taosArrayInit(8, sizeof(int64_t)); tbUids = taosArrayInit(8, sizeof(tb_uid_t));
if (tbUids == NULL) { if (tbUids == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
@ -945,7 +945,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
int32_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId; int32_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
char detail[1000] = {0}; char detail[1000] = {0};
sprintf(detail, "btime:%" PRId64 ", flags:%d, ttl:%d, type:%d", sprintf(detail, "btime:%" PRId64 ", flags:%d, ttl:%d, type:%d",
pCreateReq->btime, pCreateReq->flags, pCreateReq->ttl, pCreateReq->type); pCreateReq->btime, pCreateReq->flags, pCreateReq->ttl, pCreateReq->type);
SName name = {0}; SName name = {0};

View File

@ -0,0 +1,37 @@
import time
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
class TDTestCase:
updatecfgDict = {'ttlUnit': 1, "ttlPushInterval": 1, "ttlChangeOnWrite": 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
self.ttl = 5
self.dbname = "test"
def check_ttl_result(self):
tdSql.execute(f'create database {self.dbname}')
tdSql.execute(f'create table {self.dbname}.t1(ts timestamp, c1 int)')
tdSql.execute(f'create table {self.dbname}.t2(ts timestamp, c1 int) ttl {self.ttl}')
tdSql.query(f'show {self.dbname}.tables')
tdSql.checkRows(2)
tdSql.execute(f'flush database {self.dbname}')
time.sleep(self.ttl + 2)
tdSql.query(f'show {self.dbname}.tables')
tdSql.checkRows(1)
def run(self):
self.check_ttl_result()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,63 @@
import time
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
class TDTestCase:
updatecfgDict = {'ttlUnit': 1, "ttlPushInterval": 3, "ttlChangeOnWrite": 1, "trimVDbIntervalSec": 360,
"ttlFlushThreshold": 100, "ttlBatchDropNum": 10}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
self.ttl = 5
self.tables = 100
self.dbname = "test"
def check_batch_drop_num(self):
tdSql.execute(f'create database {self.dbname} vgroups 1')
tdSql.execute(f'use {self.dbname}')
tdSql.execute(f'create table stb(ts timestamp, c1 int) tags(t1 int)')
for i in range(self.tables):
tdSql.execute(f'create table t{i} using stb tags({i}) ttl {self.ttl}')
tdSql.execute(f'flush database {self.dbname}')
time.sleep(self.ttl + self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query('show tables')
tdSql.checkRows(90)
def check_ttl_result(self):
tdSql.execute(f'drop database if exists {self.dbname}')
tdSql.execute(f'create database {self.dbname}')
tdSql.execute(f'create table {self.dbname}.t1(ts timestamp, c1 int)')
tdSql.execute(f'create table {self.dbname}.t2(ts timestamp, c1 int) ttl {self.ttl}')
tdSql.query(f'show {self.dbname}.tables')
tdSql.checkRows(2)
tdSql.execute(f'flush database {self.dbname}')
time.sleep(self.ttl - 1)
tdSql.execute(f'insert into {self.dbname}.t2 values(now, 1)');
tdSql.execute(f'flush database {self.dbname}')
time.sleep(self.ttl - 1)
tdSql.query(f'show {self.dbname}.tables')
tdSql.checkRows(2)
tdSql.execute(f'flush database {self.dbname}')
time.sleep(self.ttl * 2)
tdSql.query(f'show {self.dbname}.tables')
tdSql.checkRows(1)
def run(self):
self.check_batch_drop_num()
self.check_ttl_result()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -35,6 +35,7 @@ class TDTestCase:
tdSql.execute(f'create table db.{self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.ttl_param}') tdSql.execute(f'create table db.{self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.ttl_param}')
tdSql.query(f'show db.tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum) tdSql.checkRows(self.tbnum)
tdSql.execute(f'flush database db')
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'] + 1) sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query(f'show db.tables') tdSql.query(f'show db.tables')
tdSql.checkRows(0) tdSql.checkRows(0)
@ -42,6 +43,7 @@ class TDTestCase:
tdSql.execute(f'create table db.{self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.default_ttl}') tdSql.execute(f'create table db.{self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.default_ttl}')
for i in range(int(self.tbnum/2)): for i in range(int(self.tbnum/2)):
tdSql.execute(f'alter table db.{self.ntbname}_{i} ttl {self.modify_ttl}') tdSql.execute(f'alter table db.{self.ntbname}_{i} ttl {self.modify_ttl}')
tdSql.execute(f'flush database db')
sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval'] + 1) sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query(f'show db.tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum - int(self.tbnum/2)) tdSql.checkRows(self.tbnum - int(self.tbnum/2))
@ -54,6 +56,7 @@ class TDTestCase:
tdSql.execute(f'create table db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.ttl_param}') tdSql.execute(f'create table db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.ttl_param}')
tdSql.query(f'show db.tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum) tdSql.checkRows(self.tbnum)
tdSql.execute(f'flush database db')
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'] + 1) sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query(f'show db.tables') tdSql.query(f'show db.tables')
tdSql.checkRows(0) tdSql.checkRows(0)
@ -63,6 +66,7 @@ class TDTestCase:
tdSql.checkRows(self.tbnum) tdSql.checkRows(self.tbnum)
for i in range(int(self.tbnum/2)): for i in range(int(self.tbnum/2)):
tdSql.execute(f'alter table db.{self.stbname}_{i} ttl {self.modify_ttl}') tdSql.execute(f'alter table db.{self.stbname}_{i} ttl {self.modify_ttl}')
tdSql.execute(f'flush database db')
sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval'] + 1) sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query(f'show db.tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum - int(self.tbnum/2)) tdSql.checkRows(self.tbnum - int(self.tbnum/2))
@ -75,6 +79,7 @@ class TDTestCase:
tdSql.execute(f'insert into db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.ttl_param} values(now,1)') tdSql.execute(f'insert into db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.ttl_param} values(now,1)')
tdSql.query(f'show db.tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum) tdSql.checkRows(self.tbnum)
tdSql.execute(f'flush database db')
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'] + 1) sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query(f'show db.tables') tdSql.query(f'show db.tables')
tdSql.checkRows(0) tdSql.checkRows(0)