diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 252959ecd8..e398441979 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -724,6 +724,17 @@ void tmqAssignAskEpTask(void* param, void* tmrId) { taosMemoryFree(param); } +void tmqReplayTask(void* param, void* tmrId) { + int64_t refId = *(int64_t*)param; + tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); + if(tmq == NULL) goto END; + + tsem_post(&tmq->rspSem); + taosReleaseRef(tmqMgmt.rsetId, refId); +END: + taosMemoryFree(param); +} + void tmqAssignDelayedCommitTask(void* param, void* tmrId) { int64_t refId = *(int64_t*)param; generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT); @@ -1144,6 +1155,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { req.autoCommit = tmq->autoCommit; req.autoCommitInterval = tmq->autoCommitInterval; req.resetOffsetCfg = tmq->resetOffsetCfg; + req.enableReplay = tmq->replayEnable; for (int32_t i = 0; i < sz; i++) { char* topic = taosArrayGetP(container, i); @@ -1823,6 +1835,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { if(tmq->replayEnable){ pVg->blockReceiveTs = taosGetTimestampMs(); pVg->blockSleepForReplay = pRsp->rsp.sleepTime; + if(pVg->blockSleepForReplay > 0){ + int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t)); + *pRefId1 = tmq->refId; + taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, pRefId1, tmqMgmt.timer); + } } tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 705fb86fab..01866ef893 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -112,8 +112,9 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* } STqOffsetVal offset = {0}; qStreamExtractOffset(task, &offset); - pHandle->block = createDataBlock(); - copyDataBlock(pHandle->block, pDataBlock); + pHandle->block = createOneDataBlock(pDataBlock, true); +// pHandle->block = createDataBlock(); +// copyDataBlock(pHandle->block, pDataBlock); pHandle->blockTime = offset.ts; code = getDataBlock(task, pHandle, vgId, &pDataBlock); if (code != 0){ @@ -129,14 +130,16 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pRsp->blockNum++; if (pDataBlock == NULL) { - break; - } - copyDataBlock(pHandle->block, pDataBlock); + blockDataDestroy(pHandle->block); + pHandle->block = NULL; + }else{ + copyDataBlock(pHandle->block, pDataBlock); - STqOffsetVal offset = {0}; - qStreamExtractOffset(task, &offset); - pRsp->sleepTime = offset.ts - pHandle->blockTime; - pHandle->blockTime = offset.ts; + STqOffsetVal offset = {0}; + qStreamExtractOffset(task, &offset); + pRsp->sleepTime = offset.ts - pHandle->blockTime; + pHandle->blockTime = offset.ts; + } break; }else{ if (pDataBlock == NULL) { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index a4c3d395e3..215f8d3cb2 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -40,11 +40,11 @@ void tqUpdateNodeStage(STQ* pTq) { tqDebug("vgId:%d update the meta stage to be:%"PRId64, pTq->pStreamMeta->vgId, pTq->pStreamMeta->stage); } -static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) { +static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset, bool withTbName) { pRsp->reqOffset = pOffset; pRsp->rspOffset = pOffset; - pRsp->withTbName = 1; + pRsp->withTbName = withTbName; pRsp->withSchema = 1; pRsp->blockData = taosArrayInit(0, sizeof(void*)); pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); @@ -177,7 +177,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, int32_t vgId = TD_VID(pTq->pVnode); SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; - tqInitTaosxRsp(&taosxRsp, *offset); + tqInitTaosxRsp(&taosxRsp, *offset, pRequest->withTbName); if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 227960ab22..f60890ecca 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -330,7 +330,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { break; } - qDebug("project return %d", pProjectInfo->mergeDataBlocks); if (pProjectInfo->mergeDataBlocks) { if (pRes->info.rows > 0) { pFinalRes->info.id.groupId = 0; // clear groupId diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 36b8fded81..b5856fea63 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -159,6 +159,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmqDataPrecisionUnit.py diff --git a/tests/system-test/7-tmq/replay.py b/tests/system-test/7-tmq/replay.py index 7eee6743a7..bbda8600fb 100644 --- a/tests/system-test/7-tmq/replay.py +++ b/tests/system-test/7-tmq/replay.py @@ -110,7 +110,7 @@ class TDTestCase: tdLog.info(shellCmd) os.system(shellCmd) - def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1): + def create_database(self,tsql, dbName,dropFlag=1,vgroups=1,replica=1): if dropFlag == 1: tsql.execute("drop database if exists %s"%(dbName)) @@ -149,21 +149,12 @@ class TDTestCase: t = time.time() startTs = int(round(t * 1000)) - #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 - for i in range(ctbNum): - sql += " %s_%d values "%(stbName,i) - for j in range(rowsPerTbl): - sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) - rowsOfSql += 1 - if ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)): - tsql.execute(sql) - time.sleep(1) - rowsOfSql = 0 - if j < rowsPerTbl - 1: - sql = "insert into %s_%d values " %(stbName,i) - else: - sql = "insert into " + for j in range(rowsPerTbl): + for i in range(ctbNum): + sql += " %s_%d values (%d, %d, 'tmqrow_%d') "%(stbName, i, startTs + j + i, j+i, j+i) + tsql.execute(sql) + time.sleep(1) + sql = "insert into " #end sql if sql != pre_insert: #print("insert sql:%s"%sql) @@ -199,10 +190,10 @@ class TDTestCase: 'actionType': 0, \ 'dbName': 'db8', \ 'dropFlag': 1, \ - 'vgroups': 4, \ + 'vgroups': 1, \ 'replica': 1, \ 'stbName': 'stb1', \ - 'ctbNum': 1, \ + 'ctbNum': 2, \ 'rowsPerTbl': 10, \ 'batchNum': 1, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 @@ -223,7 +214,7 @@ class TDTestCase: tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) consumerId = 0 - expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] * 2 topicList = topicFromStb1 ifcheckdata = 0 ifManualCommit = 1 @@ -247,8 +238,8 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows != 0: - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0)) + if totalConsumeRows != expectrowcnt: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") # tdLog.info("start consume 1 processor") diff --git a/tests/system-test/7-tmq/tmq_replay.py b/tests/system-test/7-tmq/tmq_replay.py new file mode 100644 index 0000000000..1e19d58516 --- /dev/null +++ b/tests/system-test/7-tmq/tmq_replay.py @@ -0,0 +1,39 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + + def run(self): + tdSql.prepare() + buildPath = tdCom.getBuildPath() + + cmdStr1 = '%s/build/bin/replay_test'%(buildPath) + tdLog.info(cmdStr1) + result = os.system(cmdStr1) + + if result != 0: + tdLog.exit("tmq_replay error!") + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index 343e3d8454..db5eb21ad8 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -9,6 +9,7 @@ add_executable(get_db_name_test get_db_name_test.c) add_executable(tmq_offset tmqOffset.c) add_executable(tmq_offset_test tmq_offset_test.c) add_executable(varbinary_test varbinary_test.c) +add_executable(replay_test replay_test.c) if(${TD_LINUX}) add_executable(tsz_test tsz_test.c) @@ -57,6 +58,14 @@ target_link_libraries( PUBLIC os ) +target_link_libraries( + replay_test + PUBLIC taos + PUBLIC util + PUBLIC common + PUBLIC os +) + target_link_libraries( write_raw_block_test PUBLIC taos diff --git a/utils/test/c/replay_test.c b/utils/test/c/replay_test.c new file mode 100644 index 0000000000..1fbaac0796 --- /dev/null +++ b/utils/test/c/replay_test.c @@ -0,0 +1,323 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include "taos.h" +#include "types.h" + +tmq_t* build_consumer() { + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "g1"); + tmq_conf_set(conf, "client.id", "c1"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "enable.replay", "true"); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + assert(tmq); + tmq_conf_destroy(conf); + return tmq; +} + +void test_vgroup_error(TAOS* pConn){ + TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists d1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists d1 vgroups 2 wal_retention_period 3600"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create topic t1 as select * from d1.s1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + tmq_list_t* topic_list = tmq_list_new(); + + tmq_list_append(topic_list, "t1"); + tmq_t* tmq = build_consumer(); + ASSERT(tmq_subscribe(tmq, topic_list) != 0); + tmq_list_destroy(topic_list); + tmq_consumer_close(tmq); +} + +void test_stable_db_error(TAOS* pConn){ + TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists d1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists d1 vgroups 1 wal_retention_period 3600"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create topic t1 as stable d1.s1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + tmq_list_t* topic_list = tmq_list_new(); + + tmq_list_append(topic_list, "t1"); + tmq_t* tmq = build_consumer(); + ASSERT(tmq_subscribe(tmq, topic_list) != 0); + tmq_list_destroy(topic_list); + tmq_consumer_close(tmq); + + pRes = taos_query(pConn, "drop topic if exists t1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create topic t1 as database d1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + topic_list = tmq_list_new(); + tmq_list_append(topic_list, "t1"); + tmq = build_consumer(); + ASSERT(tmq_subscribe(tmq, topic_list) != 0); + tmq_list_destroy(topic_list); + tmq_consumer_close(tmq); +} + +void insert_with_sleep(TAOS* pConn, int32_t* interval, int32_t len){ + for(int i = 0; i < len; i++){ + TAOS_RES* pRes = taos_query(pConn, "insert into d1.table1 (ts, c1) values (now, 1)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + taosMsleep(interval[i]); + } +} + +void insert_with_sleep_multi(TAOS* pConn, int32_t* interval, int32_t len){ + for(int i = 0; i < len; i++){ + TAOS_RES* pRes = taos_query(pConn, "insert into d1.table1 (ts, c1) values (now, 1) (now+1s, 2) d1.table2 (ts, c1) values (now, 1) (now+1s, 2)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + taosMsleep(interval[i]); + } +} + +void test_case1(TAOS* pConn, int32_t* interval, int32_t len){ + TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists d1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists d1 vgroups 2 wal_retention_period 3600"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table d1.table1 using d1.s1 tags(1)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + insert_with_sleep(pConn, interval, len); + + pRes = taos_query(pConn, "create topic t1 as select * from d1.table1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + tmq_list_t* topic_list = tmq_list_new(); + + tmq_list_append(topic_list, "t1"); + tmq_t* tmq = build_consumer(); + // 启动订阅 + tmq_subscribe(tmq, topic_list); + tmq_list_destroy(topic_list); + + int32_t timeout = 5000; + + int64_t t = 0; + int32_t totalRows = 0; + char buf[1024] = {0}; + while (1) { + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, timeout); + if (tmqmessage) { + if(t != 0){ + ASSERT(taosGetTimestampMs() - t >= interval[totalRows - 1]); + } + t = taosGetTimestampMs(); + + TAOS_ROW row = taos_fetch_row(tmqmessage); + if (row == NULL) { + break; + } + + TAOS_FIELD* fields = taos_fetch_fields(tmqmessage); + int32_t numOfFields = taos_field_count(tmqmessage); + const char* tbName = tmq_get_table_name(tmqmessage); + taos_print_row(buf, row, fields, numOfFields); + + printf("%lld tbname:%s, rows[%d]: %s\n", t, (tbName != NULL ? tbName : "null table"), totalRows, buf); + totalRows++; + taos_free_result(tmqmessage); + } else { + break; + } + } + + ASSERT(totalRows == len); + tmq_consumer_close(tmq); +} + +void test_case2(TAOS* pConn, int32_t* interval, int32_t len, tsem_t* sem){ + TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists d1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists d1 vgroups 1 wal_retention_period 3600"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table d1.table1 using d1.s1 tags(1)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table d1.table2 using d1.s1 tags(2)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + insert_with_sleep_multi(pConn, interval, len); + + pRes = taos_query(pConn, "create topic t1 as select * from d1.s1"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + tmq_list_t* topic_list = tmq_list_new(); + + tmq_list_append(topic_list, "t1"); + tmq_t* tmq = build_consumer(); + // 启动订阅 + tmq_subscribe(tmq, topic_list); + tmq_list_destroy(topic_list); + + int32_t timeout = 5000; + + int64_t t = 0; + int32_t totalRows = 0; + char buf[1024] = {0}; + while (1) { + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, timeout); + if (tmqmessage) { + if(t != 0 && totalRows % 4 == 0){ + ASSERT(taosGetTimestampMs() - t >= interval[totalRows/4 - 1]); + } + t = taosGetTimestampMs(); + + while(1){ + TAOS_ROW row = taos_fetch_row(tmqmessage); + if (row == NULL) { + break; + } + + TAOS_FIELD* fields = taos_fetch_fields(tmqmessage); + int32_t numOfFields = taos_field_count(tmqmessage); + const char* tbName = tmq_get_table_name(tmqmessage); + taos_print_row(buf, row, fields, numOfFields); + + printf("%lld tbname:%s, rows[%d]: %s\n", t, (tbName != NULL ? tbName : "null table"), totalRows, buf); + totalRows++; + } + + taos_free_result(tmqmessage); + + if(totalRows == len * 4){ + taosSsleep(1); + tsem_post(sem); + } + } else { + break; + } + } + + ASSERT(totalRows == len * 4 + 1); + tmq_consumer_close(tmq); +} + +void* insertThreadFunc(void* param) { + tsem_t* sem = (tsem_t*)param; + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + + tsem_wait(sem); + + TAOS_RES* pRes = taos_query(pConn, "insert into d1.table1 (ts, c1) values (now, 11)"); + ASSERT(taos_errno(pRes) == 0); + printf("insert data again\n"); + taos_free_result(pRes); + taos_close(pConn); + return NULL; +} + +int main(int argc, char* argv[]) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + test_vgroup_error(pConn); + test_stable_db_error(pConn); + + tsem_t sem; + tsem_init(&sem, 0, 0); + TdThread thread; + TdThreadAttr thattr; + taosThreadAttrInit(&thattr); + taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); + + // pthread_create one thread to consume + taosThreadCreate(&thread, &thattr, insertThreadFunc, (void*)(&sem)); + + int32_t interval[5] = {1000, 200, 3000, 40, 500}; + test_case1(pConn, interval, sizeof(interval)/sizeof(int32_t)); + printf("test_case1 success\n"); + test_case2(pConn, interval, sizeof(interval)/sizeof(int32_t), &sem); + taos_close(pConn); + + taosThreadJoin(thread, NULL); + taosThreadClear(&thread); + tsem_destroy(&sem); + return 0; +} diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index 6b774b3eff..34f4a9d094 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -621,10 +621,11 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn taos_print_row(buf, row, fields, numOfFields); if (0 != g_stConfInfo.showRowFlag) { - taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf); + taosFprintfFile(g_fp, "%lld tbname:%s, rows[%d]: %s\n", taosGetTimestampMs(), (tbName != NULL ? tbName : "null table"), totalRows, buf); // if (0 != g_stConfInfo.saveRowFlag) { // saveConsumeContentToTbl(pInfo, buf); // } +// taosFsyncFile(g_fp); } totalRows++;