From d10915dce6a3cced5024c307834650f7ee19191e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 9 Oct 2023 19:03:13 +0800 Subject: [PATCH] feat:[TD-26056] add replay logic --- utils/test/c/replay_test.c | 89 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/utils/test/c/replay_test.c b/utils/test/c/replay_test.c index 1fbaac0796..df493ce2ac 100644 --- a/utils/test/c/replay_test.c +++ b/utils/test/c/replay_test.c @@ -281,6 +281,93 @@ void test_case2(TAOS* pConn, int32_t* interval, int32_t len, tsem_t* sem){ tmq_consumer_close(tmq); } +void test_case3(TAOS* pConn, int32_t* interval, int32_t len){ + TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists d1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists d1 vgroups 1 wal_retention_period 3600"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table d1.table1 using d1.s1 tags(1)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table d1.table2 using d1.s1 tags(2)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + insert_with_sleep_multi(pConn, interval, len); + + pRes = taos_query(pConn, "create topic t1 as select * from d1.s1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + tmq_list_t* topic_list = tmq_list_new(); + + tmq_list_append(topic_list, "t1"); + tmq_t* tmq = build_consumer(); + // 启动订阅 + tmq_subscribe(tmq, topic_list); + + int32_t timeout = 5000; + + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, timeout); + taos_free_result(tmqmessage); + + tmq_consumer_close(tmq); + + tmq = build_consumer(); + // 启动订阅 + tmq_subscribe(tmq, topic_list); + + int64_t t = 0; + int32_t totalRows = 0; + char buf[1024] = {0}; + while (1) { + tmqmessage = tmq_consumer_poll(tmq, timeout); + if (tmqmessage) { + if(t != 0 && totalRows % 4 == 0){ + ASSERT(taosGetTimestampMs() - t >= interval[totalRows/4 - 1]); + } + t = taosGetTimestampMs(); + + while(1){ + TAOS_ROW row = taos_fetch_row(tmqmessage); + if (row == NULL) { + break; + } + + TAOS_FIELD* fields = taos_fetch_fields(tmqmessage); + int32_t numOfFields = taos_field_count(tmqmessage); + const char* tbName = tmq_get_table_name(tmqmessage); + taos_print_row(buf, row, fields, numOfFields); + + printf("%lld tbname:%s, rows[%d]: %s\n", t, (tbName != NULL ? tbName : "null table"), totalRows, buf); + totalRows++; + } + + taos_free_result(tmqmessage); + } else { + break; + } + } + + ASSERT(totalRows == len * 4); + + tmq_consumer_close(tmq); + tmq_list_destroy(topic_list); +} + void* insertThreadFunc(void* param) { tsem_t* sem = (tsem_t*)param; TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -314,6 +401,8 @@ int main(int argc, char* argv[]) { test_case1(pConn, interval, sizeof(interval)/sizeof(int32_t)); printf("test_case1 success\n"); test_case2(pConn, interval, sizeof(interval)/sizeof(int32_t), &sem); + printf("test_case2 success\n"); + test_case3(pConn, interval, sizeof(interval)/sizeof(int32_t)); taos_close(pConn); taosThreadJoin(thread, NULL);