From b83cc11043334f79eac15cf220d406dfdb51205a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 24 Aug 2023 16:38:53 +0800 Subject: [PATCH 1/4] fix:[TD-25651] reset epoch if consumer changed to avoid consumeing no data --- source/dnode/vnode/src/tq/tq.c | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 815e9647b5..c8da7e0b46 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -879,20 +879,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); - // atomic_add_fetch_32(&pHandle->epoch, 1); + atomic_store_32(&pHandle->epoch, 0); - // kill executing task - // if(tqIsHandleExec(pHandle)) { - // qTaskInfo_t pTaskInfo = pHandle->execHandle.task; - // if (pTaskInfo != NULL) { - // qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); - // } - - // if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - // qStreamCloseTsdbReader(pTaskInfo); - // } - // } - // remove if it has been register in the push manager, and return one empty block to consumer tqUnregisterPushHandle(pTq, pHandle); ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); } From 64959f14e9440049bf4e8e2e39ef95d0230fd8c1 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 24 Aug 2023 18:18:02 +0800 Subject: [PATCH 2/4] fix:dot process in schemaless --- source/client/src/clientSml.c | 14 +++++++++++++- utils/test/c/sml_test.c | 13 +++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index ffff3df5d0..8e5c6e7250 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -218,7 +218,16 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable) { if (strlen(oneTable->childTableName) == 0) { SArray *dst = taosArrayDup(oneTable->tags, NULL); - RandTableName rName = {dst, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName}; + ASSERT(oneTable->sTableNameLen < TSDB_TABLE_NAME_LEN); + char superName[TSDB_TABLE_NAME_LEN] = {0}; + RandTableName rName = {dst, NULL, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName}; + if(tsSmlDot2Underline){ + memcpy(superName, oneTable->sTableName, oneTable->sTableNameLen); + smlStrReplace(superName, oneTable->sTableNameLen); + rName.stbFullName = superName; + }else{ + rName.stbFullName = oneTable->sTableName; + } buildChildTableName(&rName); taosArrayDestroy(dst); @@ -230,6 +239,9 @@ void getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tin char key[TSDB_TABLE_NAME_LEN * 2 + 1] = {0}; size_t nLen = strlen(tinfo->childTableName); memcpy(key, currElement->measure, currElement->measureLen); + if(tsSmlDot2Underline){ + smlStrReplace(key, currElement->measureLen); + } memcpy(key + currElement->measureLen + 1, tinfo->childTableName, nLen); void *uid = taosHashGet(info->tableUids, key, diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index e4ed6037a3..5be9d98a7f 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -1533,6 +1533,7 @@ int sml_ts3724_Test() { const char *sql[] = { "stb.2,t1=1 f1=283i32 1632299372000", + "stb_2,t1=1 f1=283i32 1632299372000", ".stb2,t1=1 f1=106i32 1632299378000", "stb2.,t1=1 f1=106i32 1632299378000", }; @@ -1547,6 +1548,18 @@ int sml_ts3724_Test() { printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); taos_free_result(pRes); + pRes = taos_query(taos, "select * from stb_2"); + TAOS_ROW row = taos_fetch_row(pRes); + int numRows = taos_affected_rows(pRes); + ASSERT(numRows == 1); + taos_free_result(pRes); + + pRes = taos_query(taos, "show stables"); + row = taos_fetch_row(pRes); + numRows = taos_affected_rows(pRes); + ASSERT(numRows == 3); + taos_free_result(pRes); + taos_close(taos); return code; From d5cc4155420a2e93b012c56827791ea2cf7810dd Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 24 Aug 2023 18:35:53 +0800 Subject: [PATCH 3/4] fix:test case error --- tests/system-test/7-tmq/subscribeDb0.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/7-tmq/subscribeDb0.py b/tests/system-test/7-tmq/subscribeDb0.py index ed13fcbe06..d4dfe425dc 100644 --- a/tests/system-test/7-tmq/subscribeDb0.py +++ b/tests/system-test/7-tmq/subscribeDb0.py @@ -237,7 +237,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows != expectrowcnt: + if totalConsumeRows < expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") From 8045f30be8b8d27f82a854f503314cb2431c48fb Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 24 Aug 2023 23:15:35 +0800 Subject: [PATCH 4/4] fix:sml test case error for tbname --- tests/system-test/2-query/sml_TS-3724.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/2-query/sml_TS-3724.py b/tests/system-test/2-query/sml_TS-3724.py index a8b16c4662..410e266f10 100644 --- a/tests/system-test/2-query/sml_TS-3724.py +++ b/tests/system-test/2-query/sml_TS-3724.py @@ -67,7 +67,7 @@ class TDTestCase: tdSql.query(f"select distinct tbname from {dbname}.`sys_if_bytes_out`") tdSql.checkRows(2) - tdSql.query(f"select * from {dbname}.t_fc70dec6677d4277c5d9799c4da806da order by times") + tdSql.query(f"select * from {dbname}.t_f67972b49aa8adf8bca5d0d54f0d850d order by times") tdSql.checkRows(2) tdSql.checkData(0, 1, 1.300000000) tdSql.checkData(1, 1, 13.000000000)