add tsma test, fix catalog cache update

This commit is contained in:
wangjiaming0909 2024-01-17 09:50:35 +08:00
parent 54666c953f
commit 54100b1c0d
7 changed files with 296 additions and 75 deletions

View File

@ -331,7 +331,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
(_type) == TDMT_MND_DROP_STB || (_type) == TDMT_MND_CREATE_VIEW || (_type) == TDMT_MND_DROP_VIEW)
(_type) == TDMT_MND_DROP_STB || (_type) == TDMT_MND_CREATE_VIEW || (_type) == TDMT_MND_DROP_VIEW || \
(_type) == TDMT_MND_CREATE_TSMA || (_type) == TDMT_MND_DROP_TSMA)
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
(SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || \

View File

@ -341,7 +341,7 @@ static int32_t hbprocessTSMARsp(void* value, int32_t valueLen, struct SCatalog*
STableTSMAInfo* pTsmaInfo = taosArrayGetP(hbRsp.pTsmas, i);
if (!pTsmaInfo->pFuncs) {
tscDebug("hb to remote tsma: %s.%s", pTsmaInfo->dbFName, pTsmaInfo->name);
tscDebug("hb to remove tsma: %s.%s", pTsmaInfo->dbFName, pTsmaInfo->name);
catalogRemoveTSMA(pCatalog, pTsmaInfo);
tFreeAndClearTableTSMAInfo(pTsmaInfo);
} else {

View File

@ -582,6 +582,7 @@ typedef struct SCtgDropTbTSMAMsg {
uint64_t tsmaId;
uint64_t dbId;
uint64_t tbId;
bool dropAllForTb;
} SCtgDropTbTSMAMsg;

View File

@ -3029,6 +3029,9 @@ int32_t ctgRemoveTbMetaFromCache(SCatalog *pCtg, SName *pTableName, bool syncReq
CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq));
}
// TODO TEST normal table
CTG_ERR_JRET(ctgDropTSMAForTbEnqueue(pCtg, pTableName, syncReq));
_return:
taosMemoryFreeClear(tblMeta);
@ -3262,7 +3265,7 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
}
CTG_LOCK(CTG_READ, &pCache->tsmaLock);
if (!pCache->pTsmas) {
if (!pCache->pTsmas || pCache->pTsmas->size == 0) {
CTG_UNLOCK(CTG_READ, &pCache->tsmaLock);
taosHashRelease(dbCache->tsmaCache, pCache);
ctgDebug("tsma for tb: %s.%s not in cache", pName->tname, dbFName);
@ -3302,6 +3305,7 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
}
res.pRes = pRsp;
taosArrayPush(pCtx->pResList, &res);
CTG_UNLOCK(CTG_READ, &pCache->tsmaLock);
taosHashRelease(dbCache->tsmaCache, pCache);
}
ctgReleaseDBCache(pCtg, dbCache);
@ -3343,7 +3347,7 @@ int32_t ctgDropTbTSMAEnqueue(SCatalog* pCtg, const STSMACache* pTsma, bool sync
op->opId = CTG_OP_DROP_TB_TSMA;
op->syncOp = syncOp;
SCtgDropTbTSMAMsg* msg = taosMemoryMalloc(sizeof(SCtgDropTbTSMAMsg));
SCtgDropTbTSMAMsg* msg = taosMemoryCalloc(1, sizeof(SCtgDropTbTSMAMsg));
if (!msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbTSMAMsg));
taosMemoryFree(op);
@ -3366,34 +3370,63 @@ _return:
CTG_RET(code);
}
static SCtgCacheOperation* createDropAllTbTsmaCtgCacheOp(SCatalog* pCtg, const STSMACache* pCache, bool syncOp) {
SCtgCacheOperation* pOp = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
if (!pOp) return NULL;
SCtgDropTbTSMAMsg* pMsg = taosMemoryCalloc(1, sizeof(SCtgDropTbTSMAMsg));
if (!pMsg) {
taosMemoryFree(pOp);
return NULL;
}
pOp->opId = CTG_OP_DROP_TB_TSMA;
pOp->syncOp = syncOp;
pMsg->pCtg = pCtg;
pMsg->dbId = pCache->dbId;
pMsg->tbId = pCache->suid;
pMsg->tsmaId = pCache->tsmaId;
pMsg->dropAllForTb = true;
tstrncpy(pMsg->tsmaName, pCache->name, TSDB_TABLE_NAME_LEN);
tstrncpy(pMsg->dbFName, pCache->dbFName, TSDB_DB_FNAME_LEN);
tstrncpy(pMsg->tbName, pCache->tb, TSDB_TABLE_NAME_LEN);
pOp->data = pMsg;
return pOp;
}
int32_t ctgDropTSMAForTbEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation* op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
if (!op) CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
op->opId = CTG_OP_DROP_TB_TSMA;
op->syncOp = syncOp;
SCtgDropTbTSMAMsg* msg = taosMemoryMalloc(sizeof(SCtgDropTbTSMAMsg));
if (!msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbTSMAMsg));
taosMemoryFree(op);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
SCtgDBCache* pDbCache = NULL;
SCtgCacheOperation* pOp = NULL;
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pName, dbFName);
CTG_ERR_JRET(ctgGetDBCache(pCtg, dbFName, &pDbCache));
if (NULL == pDbCache || !pDbCache->tsmaCache) {
goto _return;
}
msg->pCtg = pCtg;
msg->dbId = 0;
msg->tbId = 0;
msg->tsmaId = 0;
msg->tsmaName[0] = '\0';
tNameGetFullDbName(pName, msg->dbFName);
tstrncpy(msg->tbName, pName->tname, TSDB_TABLE_NAME_LEN);
SCtgTSMACache *pCtgCache = taosHashGet(pDbCache->tsmaCache, pName->tname, strlen(pName->tname));
if (!pCtgCache || !pCtgCache->pTsmas || pCtgCache->pTsmas->size == 0) {
goto _return;
}
op->data = msg;
CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock);
STSMACache *pCache = taosArrayGetP(pCtgCache->pTsmas, 0);
pOp = createDropAllTbTsmaCtgCacheOp(pCtg, pCache, syncOp);
if (!pOp) {
code = TSDB_CODE_OUT_OF_MEMORY;
CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock);
goto _return;
}
CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock);
CTG_ERR_JRET(ctgEnqueue(pCtg, pOp));
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
if (pOp) {
taosMemoryFree(pOp->data);
taosMemoryFree(pOp);
}
CTG_RET(code);
}
@ -3479,29 +3512,44 @@ int32_t ctgOpDropTbTSMA(SCtgCacheOperation *operation) {
goto _return;
}
CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock);
STSMACache *pCache = taosArrayGetP(pCtgCache->pTsmas, 0);
uint64_t cacheSize = 0;
if (msg->tbId != 0 && pCache->suid != msg->tbId) {
// table id mismatch, skip drops
CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock);
goto _return;
}
for (int32_t i = 0; i < pCtgCache->pTsmas->size; ++i) {
pCache = taosArrayGetP(pCtgCache->pTsmas, i);
if (pCache->tsmaId != msg->tsmaId) {
continue;
STSMACache *pCache = NULL;
if (msg->dropAllForTb) {
CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock);
for (int32_t i = 0; i < pCtgCache->pTsmas->size; ++i) {
pCache = taosArrayGetP(pCtgCache->pTsmas, i);
cacheSize += ctgGetTbTSMACacheSize(pCache);
CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare,
ctgTSMAVersionSearchCompare));
CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA);
}
cacheSize = ctgGetTbTSMACacheSize(pCache);
CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare,
ctgTSMAVersionSearchCompare));
taosArrayRemove(pCtgCache->pTsmas, i);
tFreeAndClearTableTSMAInfo(pCache);
CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA);
break;
taosArrayDestroyP(pCtgCache->pTsmas, tFreeAndClearTableTSMAInfo);
pCtgCache->pTsmas = NULL;
CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock);
taosHashRemove(dbCache->tsmaCache, msg->tbName, TSDB_TABLE_NAME_LEN);
} else {
CTG_LOCK(CTG_WRITE, &pCtgCache->tsmaLock);
pCache = taosArrayGetP(pCtgCache->pTsmas, 0);
if (msg->tbId != 0 && pCache->suid != msg->tbId) {
// table id mismatch, skip drops
CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock);
goto _return;
}
for (int32_t i = 0; i < pCtgCache->pTsmas->size; ++i) {
pCache = taosArrayGetP(pCtgCache->pTsmas, i);
if (pCache->tsmaId != msg->tsmaId) {
continue;
}
cacheSize = ctgGetTbTSMACacheSize(pCache);
CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare,
ctgTSMAVersionSearchCompare));
taosArrayRemove(pCtgCache->pTsmas, i);
tFreeAndClearTableTSMAInfo(pCache);
CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA);
break;
}
CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock);
}
CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock);
//taosHashRemove(dbCache->tsmaCache, msg->tbName, strlen(msg->tbName));
atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize);
_return:

View File

@ -10625,6 +10625,11 @@ static int32_t translateCreateTSMA(STranslateContext* pCxt, SCreateTSMAStmt* pSt
if (code == TSDB_CODE_SUCCESS) {
code = buildCreateTSMAReq(pCxt, pStmt, &smaReq);
}
if ( TSDB_CODE_SUCCESS == code) {
SName name;
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name);
code = collectUseTable(&name, pCxt->pTargetTables);
}
if (TSDB_CODE_SUCCESS == code) {
// TODO replace with tsma serialization func
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TSMA, (FSerializeFunc)tSerializeSMCreateSmaReq, &smaReq);

View File

@ -5776,7 +5776,6 @@ static bool tsmaOptMayBeOptimized(SLogicNode* pNode) {
SNode* pConds = pScan->node.pConditions;
if (pScan->scanType != SCAN_TYPE_TABLE || !pParent || pConds) return false;
if (!pScan->pTsmas || pScan->pTsmas->size <= 0) {
return false;
}

View File

@ -1,11 +1,8 @@
from random import randrange
from pandas import tseries
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
@ -14,6 +11,148 @@ from util.dnodes import *
from util.common import *
# from tmqCommon import *
class TSMA:
def __init__(self):
self.tsma_name = ''
self.db_name = ''
self.original_table_name = ''
self.funcs = []
self.cols = []
self.interval: str = ''
class UsedTsma:
TS_MIN = '-9223372036854775808'
TS_MAX = '9223372036854775806'
def __init__(self) -> None:
self.name = '' ## tsma name or table name
self.time_range_start = UsedTsma.TS_MIN
self.time_range_end = UsedTsma.TS_MAX
self.is_tsma_ = False
def __eq__(self, __value: object) -> bool:
if isinstance(__value, self.__class__):
return self.name == __value.name \
and self.time_range_start == __value.time_range_start \
and self.time_range_end == __value.time_range_end \
and self.is_tsma_ == __value.is_tsma_
else:
return False
class TSMAQueryContext:
def __init__(self) -> None:
self.sql = ''
self.used_tsmas: List[UsedTsma] = []
def __eq__(self, __value) -> bool:
if isinstance(__value, self.__class__):
if len(self.used_tsmas) != len(__value.used_tsmas):
return False
for used_tsma1, used_tsma2 in zip(self.used_tsmas, __value.used_tsmas):
if not used_tsma1 == used_tsma2:
return False
return True
else:
return False
def __str__(self) -> str:
return str(self.used_tsmas)
def has_tsma(self) -> bool:
for tsma in self.used_tsmas:
if tsma.is_tsma_:
return True
return False
class TSMAQueryContextBuilder:
def __init__(self) -> None:
self.ctx: TSMAQueryContext = TSMAQueryContext()
def get_ctx(self) -> TSMAQueryContext:
return self.ctx
def with_sql(self, sql: str):
self.ctx.sql = sql
return self
def should_query_with_table(self, tb_name: str, ts_begin: str, ts_end: str) -> 'TSMAQueryContextBuilder':
used_tsma: UsedTsma = UsedTsma()
used_tsma.name = tb_name
used_tsma.time_range_start = ts_begin
used_tsma.time_range_end = ts_end
used_tsma.is_tsma_ = False
self.ctx.used_tsmas.append(used_tsma)
return self
def should_query_with_tsma(self, tsma_name: str, ts_begin: str, ts_end: str) -> 'TSMAQueryContextBuilder':
used_tsma: UsedTsma = UsedTsma()
used_tsma.name = tsma_name + '_tsma_res_stb'
used_tsma.time_range_start = ts_begin
used_tsma.time_range_end = ts_end
used_tsma.is_tsma_ = True
self.ctx.used_tsmas.append(used_tsma)
return self
class TSMATestContext:
def __init__(self, tdSql: TDSql) -> None:
self.tsmas = []
self.tdSql: TDSql = tdSql
def explain_sql(self, sql: str):
tdSql.execute("alter local 'querySmaOptimize' '1'")
sql = "explain verbose true " + sql
tdSql.query(sql, queryTimes=1)
res = self.tdSql.queryResult
if self.tdSql.queryResult is None:
raise
return res
def get_tsma_query_ctx(self, sql: str):
explain_res = self.explain_sql(sql)
query_ctx: TSMAQueryContext = TSMAQueryContext()
query_ctx.sql = sql
query_ctx.used_tsmas = []
used_tsma : UsedTsma = UsedTsma()
for row in explain_res:
row = str(row)
if len(used_tsma.name) == 0:
idx = row.find("Table Scan on ")
if idx >= 0:
words = row[idx:].split(' ')
used_tsma.name = words[3]
if used_tsma.name.endswith('tsma_res_stb'):
used_tsma.is_tsma_ = True
else:
idx = row.find('Time Range:')
if idx >= 0:
row = row[idx:].split('[')[1]
row = row.split(']')[0]
words = row.split(',')
used_tsma.time_range_start = words[0].strip()
used_tsma.time_range_end = words[1].strip()
query_ctx.used_tsmas.append(used_tsma)
used_tsma = UsedTsma()
return query_ctx
def check_explain(self, sql: str, expect: TSMAQueryContext):
query_ctx = self.get_tsma_query_ctx(sql)
if not query_ctx == expect:
tdLog.exit('check explain failed for sql: %s \nexpect: %s \nactual: %s' % (sql, str(expect), str(query_ctx)))
def check_result(self, sql: str):
pass
def check_sql(self, sql: str, expect: TSMAQueryContext):
self.check_explain(sql, expect=expect)
if expect.has_tsma():
self.check_result(sql)
def check_sqls(self, sqls: list[str], expects: list[TSMAQueryContext]):
for sql, query_ctx in zip(sqls, expects):
self.check_sql(sql, query_ctx)
class TDTestCase:
def __init__(self):
self.vgroups = 4
@ -25,6 +164,7 @@ class TDTestCase:
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
self.test_ctx: TSMATestContext = TSMATestContext(tdSql)
def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'):
if dropFlag == 1:
@ -62,7 +202,7 @@ class TDTestCase:
sql += " %s%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl):
if (i < ctbNum/2):
sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep + randrange(500), j%10, j%10, j%10, j%10, j%10, j%10, j%10, j%10)
sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep + randrange(500), j%10 + randrange(100), j%10 + randrange(200), j%10, j%10, j%10, j%10, j%10, j%10)
else:
sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep + randrange(500), j%10, j%10, j%10, j%10, j%10, j%10)
rowsBatched += 1
@ -125,55 +265,82 @@ class TDTestCase:
sql = 'drop tsma %s.%s' % (db, tsma_name)
tdSql.execute(sql, queryTimes=1)
def test_explain_query_with_tsma(self):
def check_explain_res_has_row(self, plan_str_expect: str, explain_output):
plan_found = False
for row in explain_output:
if str(row).find(plan_str_expect) >= 0:
tdLog.debug("plan: [%s] found in: [%s]" % (plan_str_expect, str(row)))
plan_found = True
break
if not plan_found:
tdLog.exit("plan: %s not found in res: [%s]" % (plan_str_expect, str(explain_output)))
def test_query_with_tsma(self):
self.init_data()
self.create_tsma('tsma1', 'test', 'meters', ['avg'], ['c1', 'c2'], '5m')
self.create_tsma('tsma2', 'test', 'meters', ['avg'], ['c1', 'c2'], '30m')
self.test_explain_query_with_tsma_interval()
self.test_explain_query_with_tsma_agg()
#time.sleep(9999999)
self.test_query_with_tsma_interval()
self.test_query_with_tsma_agg()
def test_explain_query_with_tsma_interval(self):
self.test_explain_query_with_tsma_interval_no_partition()
self.test_explain_query_with_tsma_interval_partition_by_col()
self.test_explain_query_with_tsma_interval_partition_by_tbname()
self.test_explain_query_with_tsma_interval_partition_by_tag()
self.test_explain_query_with_tsma_interval_partition_by_hybrid()
def test_query_with_tsma_interval(self):
self.test_query_with_tsma_interval_no_partition()
self.test_query_with_tsma_interval_partition_by_col()
self.test_query_with_tsma_interval_partition_by_tbname()
self.test_query_with_tsma_interval_partition_by_tag()
self.test_query_with_tsma_interval_partition_by_hybrid()
def test_explain_query_with_tsma_interval_no_partition(self):
def test_query_with_tsma_interval_no_partition(self):
pass
def test_explain_query_with_tsma_interval_partition_by_tbname(self):
def test_query_with_tsma_interval_partition_by_tbname(self):
pass
def test_explain_query_with_tsma_interval_partition_by_tag(self):
def test_query_with_tsma_interval_partition_by_tag(self):
pass
def test_explain_query_with_tsma_interval_partition_by_col(self):
def test_query_with_tsma_interval_partition_by_col(self):
pass
def test_explain_query_with_tsma_interval_partition_by_hybrid(self):
def test_query_with_tsma_interval_partition_by_hybrid(self):
pass
def test_explain_query_with_tsma_agg(self):
self.test_explain_query_with_tsma_agg_no_group_by()
self.test_explain_query_with_tsma_agg_group_by_hybrid()
self.test_explain_query_with_tsma_agg_group_by_tbname()
self.test_explain_query_with_tsma_agg_group_by_tag()
def test_query_with_tsma_agg(self):
self.test_query_with_tsma_agg_no_group_by()
self.test_query_with_tsma_agg_group_by_hybrid()
self.test_query_with_tsma_agg_group_by_tbname()
self.test_query_with_tsma_agg_group_by_tag()
def test_explain_query_with_tsma_agg_no_group_by(self):
def test_query_with_tsma_agg_no_group_by(self):
ctxs: List[TSMAQueryContext] = []
ctxs.append(TSMAQueryContextBuilder().with_sql('select avg(c1), avg(c2) from meters').should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
ctxs.append(TSMAQueryContextBuilder().with_sql('select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"')\
.should_query_with_table('meters', '','').get_ctx())
tsma_sqls = [
'select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
'select avg(c1) + avg(c2), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
'select avg(c1) + avg(c2), avg(c2) + 1 from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
'select avg(c1) + avg(c2), from meters where tbname like "%t1%"'
]
non_tsma_sqls = [
'select avg(c1), avg(c2) from meters where c1 is not NULL',
'select avg(c1), avg(c2), spread(c4) from meters',
]
for ctx in ctxs:
self.test_ctx.check_sql(ctx.sql, ctx)
def test_query_with_tsma_agg_group_by_tbname(self):
pass
def test_explain_query_with_tsma_agg_group_by_tbname(self):
def test_query_with_tsma_agg_group_by_tag(self):
pass
def test_explain_query_with_tsma_agg_group_by_tag(self):
pass
def test_explain_query_with_tsma_agg_group_by_hybrid(self):
def test_query_with_tsma_agg_group_by_hybrid(self):
pass
def run(self):
self.test_explain_query_with_tsma()
self.test_query_with_tsma()
time.sleep(999999)
def stop(self):