1. fix stream wrong group id for new child tables.

2. fix md5 function wrong bytes returned
This commit is contained in:
wangjiaming0909 2024-04-22 14:00:06 +08:00
parent 2c0624a8ae
commit f971cfb778
4 changed files with 45 additions and 27 deletions

View File

@ -287,7 +287,19 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo)
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
SMetaReader* mr = (SMetaReader*)pContext;
bool isTagCol = false, isTbname = false;
if (nodeType(*pNode) == QUERY_NODE_COLUMN) {
SColumnNode* pCol = (SColumnNode*)*pNode;
if (pCol->colType == COLUMN_TYPE_TBNAME)
isTbname = true;
else
isTagCol = true;
} else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
SFunctionNode* pFunc = (SFunctionNode*)*pNode;
if (pFunc->funcType == FUNCTION_TYPE_TBNAME)
isTbname = true;
}
if (isTagCol) {
SColumnNode* pSColumnNode = *(SColumnNode**)pNode;
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
@ -316,24 +328,21 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
}
nodesDestroyNode(*pNode);
*pNode = (SNode*)res;
} else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
SFunctionNode* pFuncNode = *(SFunctionNode**)pNode;
if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) {
return DEAL_RES_ERROR;
}
res->translate = true;
res->node.resType = pFuncNode->node.resType;
int32_t len = strlen(mr->me.name);
res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
memcpy(varDataVal(res->datum.p), mr->me.name, len);
varDataSetLen(res->datum.p, len);
nodesDestroyNode(*pNode);
*pNode = (SNode*)res;
} else if (isTbname) {
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) {
return DEAL_RES_ERROR;
}
res->translate = true;
res->node.resType = ((SExprNode*)(*pNode))->resType;
int32_t len = strlen(mr->me.name);
res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
memcpy(varDataVal(res->datum.p), mr->me.name, len);
varDataSetLen(res->datum.p, len);
nodesDestroyNode(*pNode);
*pNode = (SNode*)res;
}
return DEAL_RES_CONTINUE;

View File

@ -2517,7 +2517,7 @@ static int32_t translateMd5(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = MD5_OUTPUT_LEN, .type = TSDB_DATA_TYPE_VARCHAR};
pFunc->node.resType = (SDataType){.bytes = MD5_OUTPUT_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR};
return TSDB_CODE_SUCCESS;
}

View File

@ -6328,6 +6328,7 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew
if (code == TSDB_CODE_SUCCESS) {
code = tsmaOptRewriteNodeList(pNewScan->pGroupTags, pTsmaOptCtx, pTsma, true, true);
}
pTsmaOptCtx->pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
if (pTsmaOptCtx->pScan->pTsmaTargetTbVgInfo && pTsmaOptCtx->pScan->pTsmaTargetTbVgInfo->size > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pTsmaOptCtx->pScan->pTsmas); ++i) {
STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmaOptCtx->pScan->pTsmas, i);

View File

@ -1209,7 +1209,7 @@ class TDTestCase:
self.test_ddl()
self.test_query_with_tsma()
# bug to fix
# self.test_flush_query()
self.test_flush_query()
#cluster test
cluster_dnode_list = tdSql.get_cluseter_dnodes()
@ -1231,14 +1231,22 @@ class TDTestCase:
# self.test_drop_ctable()
self.test_drop_db()
def wait_query(self, sql: str, expected_row_num: int, timeout_in_seconds: float):
def wait_query(self, sql: str, expected_row_num: int, timeout_in_seconds: float, is_expect_row = None):
timeout = timeout_in_seconds
tdSql.query(sql)
while timeout > 0 and tdSql.getRows() != expected_row_num:
tdLog.debug(f'start to wait query: {sql} to return {expected_row_num}, got: {tdSql.getRows()}, remain: {timeout_in_seconds - timeout}')
rows: int = 0
for row in tdSql.queryResult:
if is_expect_row is None or is_expect_row(row):
rows = rows + 1
while timeout > 0 and rows != expected_row_num:
tdLog.debug(f'start to wait query: {sql} to return {expected_row_num}, got: {str(tdSql.queryResult)} useful rows: {rows}, remain: {timeout_in_seconds - timeout}')
time.sleep(1)
timeout = timeout - 1
tdSql.query(sql)
rows = 0
for row in tdSql.queryResult:
if is_expect_row is None or is_expect_row(row):
rows = rows + 1
if timeout <= 0:
tdLog.exit(f'failed to wait query: {sql} to return {expected_row_num} rows timeout: {timeout_in_seconds}s')
else:
@ -1255,7 +1263,7 @@ class TDTestCase:
tdSql.error('drop tsma test.tsma1', -2147482491)
tdSql.execute('drop tsma test.tsma2', queryTimes=1)
tdSql.execute('drop tsma test.tsma1', queryTimes=1)
self.wait_query('show transactions', 0, 10)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
tdSql.execute('drop database test', queryTimes=1)
self.init_data()
@ -1296,7 +1304,7 @@ class TDTestCase:
'create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2), avg(t3)) interval(5m)', -2147471096)
tdSql.execute('alter table nsdb.meters drop tag t3', queryTimes=1)
self.wait_query('show transactions', 0, 10)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
tdSql.execute('drop database nsdb')
# drop norm table
@ -1323,7 +1331,7 @@ class TDTestCase:
# test drop stream
tdSql.error('drop stream tsma1', -2147471088) ## TSMA must be dropped first
self.wait_query('show transactions', 0, 10)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
tdSql.execute('drop database test', queryTimes=1)
self.init_data()
@ -1424,7 +1432,7 @@ class TDTestCase:
tdSql.error(
'create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097)
self.wait_query('show transactions', 0, 10)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
tdSql.execute('drop database nsdb')
def test_create_tsma_on_norm_table(self):
@ -1569,7 +1577,7 @@ class TDTestCase:
tdSql.error('create tsma tsma_illegal on test.meters function(avg(c8)) interval(5m)',-2147473406)
def test_flush_query(self):
tdSql.execute('insert into test.norm_tb (ts,c1_new,c2) values (now,1,2)(now+1s,2,3)(now+2s,2,3)(now+3s,2,3) (now+4s,1,2)(now+5s,2,3)(now+6s,2,3)(now+7s,2,3); select /*+ skip_tsma()*/ avg(c1_new),avg(c2) from test.norm_tb interval(10m);select avg(c1_new),avg(c2) from test.norm_tb interval(10m);select * from information_schema.ins_stream_tasks;', queryTimes=1)
tdSql.execute('insert into test.norm_tb (ts,c1,c2) values (now,1,2)(now+1s,2,3)(now+2s,2,3)(now+3s,2,3) (now+4s,1,2)(now+5s,2,3)(now+6s,2,3)(now+7s,2,3); select /*+ skip_tsma()*/ avg(c1),avg(c2) from test.norm_tb interval(10m);select avg(c1),avg(c2) from test.norm_tb interval(10m);select * from information_schema.ins_stream_tasks;', queryTimes=1)
tdSql.execute('flush database test', queryTimes=1)
tdSql.query('select count(*) from test.meters', queryTimes=1)
tdSql.checkData(0,0,100000)