From dacbfc03cd64c9c20afde14b27d3a48bed5c7158 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 20 May 2022 23:24:51 +0800 Subject: [PATCH] test(tmq): enable drop table case --- example/src/tmq.c | 7 ++++--- source/dnode/vnode/src/tq/tqRead.c | 10 +++++++--- tests/system-test/7-tmq/subscribeStb.py | 2 +- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index 1abce3f188..338399232c 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -106,8 +106,8 @@ int32_t create_topic() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/ + /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ + pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; @@ -167,6 +167,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "td.connect.pass", "taosdata"); /*tmq_conf_set(conf, "td.connect.db", "abc1");*/ tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set(conf, "enable.auto.commit", "false"); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); assert(tmq); @@ -239,7 +240,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { msg_process(tmqmessage); taos_free_result(tmqmessage); - tmq_commit_async(tmq, NULL, tmq_commit_cb_print, NULL); + /*tmq_commit_async(tmq, NULL, tmq_commit_cb_print, NULL);*/ /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/ } } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 096208b961..918660a9ec 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -90,6 +90,10 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p int32_t sversion = 1; if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) { pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); + if (pHandle->pSchema == NULL) { + tqError("cannot found schema for table: %ld, version %d", pHandle->msgIter.suid, pHandle->sver); + return -1; + } // this interface use suid instead of uid pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true); @@ -190,7 +194,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p } return 0; FAIL: - taosArrayDestroy(*ppCols); + if (*ppCols) taosArrayDestroy(*ppCols); return -1; } @@ -235,8 +239,8 @@ int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { int tqReadHandleRemoveTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { ASSERT(pHandle->tbIdHash != NULL); - for(int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { - int64_t* pKey = (int64_t*) taosArrayGet(tbUidList, i); + for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { + int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i); taosHashRemove(pHandle->tbIdHash, pKey, sizeof(int64_t)); } diff --git a/tests/system-test/7-tmq/subscribeStb.py b/tests/system-test/7-tmq/subscribeStb.py index 6fcc2d5e5f..3fc964b37d 100644 --- a/tests/system-test/7-tmq/subscribeStb.py +++ b/tests/system-test/7-tmq/subscribeStb.py @@ -1377,7 +1377,7 @@ class TDTestCase: self.tmqCase1(cfgPath, buildPath) self.tmqCase2(cfgPath, buildPath) - #self.tmqCase3(cfgPath, buildPath) + self.tmqCase3(cfgPath, buildPath) self.tmqCase4(cfgPath, buildPath) self.tmqCase5(cfgPath, buildPath) self.tmqCase6(cfgPath, buildPath)