From 286b9e40715547c6a80a7ded567846550ddeaea0 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Mon, 20 Jun 2022 20:54:43 +0800 Subject: [PATCH] test: add notify between main script and comsume processor --- tests/system-test/7-tmq/subscribeDb3.py | 38 +++++++++-- tests/test/c/tmqSim.c | 89 ++++++++++++++++--------- 2 files changed, 89 insertions(+), 38 deletions(-) diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py index 6973f4c51f..b576a0ea70 100644 --- a/tests/system-test/7-tmq/subscribeDb3.py +++ b/tests/system-test/7-tmq/subscribeDb3.py @@ -54,9 +54,11 @@ class TDTestCase: tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) + tdSql.query("drop table if exists %s.notifyinfo "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + tdSql.query("create table %s.notifyinfo (ts timestamp, cmdid int, consumerid int)"%cdbName) def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): sql = "insert into %s.consumeinfo values "%cdbName @@ -64,6 +66,27 @@ class TDTestCase: tdLog.info("consume info sql: %s"%sql) tdSql.query(sql) + def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb'): + while 1: + tdSql.query("select * from %s.notifyinfo"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if (tdSql.getRows() == 1) and (tdSql.getData(0, 1) == 0): + break + else: + time.sleep(0.1) + return + + def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'): + while 1: + tdSql.query("select * from %s.notifyinfo"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 2 : + print(tdSql.getData(0, 1), tdSql.getData(1, 1)) + if tdSql.getData(1, 1) == 1: + break + time.sleep(0.1) + return + def selectConsumeResult(self,expectRows,cdbName='cdb'): resultList=[] while 1: @@ -72,7 +95,7 @@ class TDTestCase: if tdSql.getRows() == expectRows: break else: - time.sleep(5) + time.sleep(1) for i in range(expectRows): tdLog.info ("ts: %s, consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 0), tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) @@ -207,7 +230,9 @@ class TDTestCase: showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - time.sleep(2) + tdLog.info("wait the notify info of start consume") + self.getStartConsumeNotifyFromTmqsim() + tdLog.info("pkill consume processor") if (platform.system().lower() == 'windows'): os.system("TASKKILL /F /IM tmq_sim.exe") @@ -282,14 +307,17 @@ class TDTestCase: showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - time.sleep(6) + # time.sleep(6) + tdLog.info("start to wait commit notify") + self.getStartCommitNotifyFromTmqsim() + tdLog.info("pkill consume processor") if (platform.system().lower() == 'windows'): os.system("TASKKILL /F /IM tmq_sim.exe") else: os.system('pkill tmq_sim') - expectRows = 0 - resultList = self.selectConsumeResult(expectRows) + # expectRows = 0 + # resultList = self.selectConsumeResult(expectRows) # wait for data ready prepareEnvThread.join() diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 0f78a003d6..948df3a40a 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -34,6 +34,12 @@ #define MAX_CONSUMER_THREAD_CNT (16) #define MAX_VGROUP_CNT (32) +typedef enum { + NOTIFY_CMD_START_CONSUM, + NOTIFY_CMD_START_COMMIT, + NOTIFY_CMD_ID_BUTT +}NOTIFY_CMD_ID; + typedef struct { TdThread thread; int32_t consumerId; @@ -67,6 +73,8 @@ typedef struct { int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume int64_t ts; + TAOS* taos; + } SThreadInfo; typedef struct { @@ -339,8 +347,37 @@ int queryDB(TAOS* taos, char* command) { return 0; } +static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) { +} + +int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { + char sqlStr[1024] = {0}; + + int64_t now = taosGetTimestampMs(); + + // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int + sprintf(sqlStr, "insert into %s.notifyinfo values (%"PRId64", %d, %d)", + g_stConfInfo.cdbName, + now, + cmdId, + pInfo->consumerId); + + taos_query_a(pInfo->taos, sqlStr, appNothing, NULL); + + taosFprintfFile(g_fp, "notifyMainScript success, sql: %s\n", sqlStr); + + return 0; +} + +static int32_t g_once_commit_flag = 0; static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { - pError("tmq_commit_cb_print() commit %d\n", code); + pError("tmq_commit_cb_print() commit %d\n", code); + + if (0 == g_once_commit_flag) { + g_once_commit_flag = 1; + notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); + } + taosFprintfFile(g_fp, "tmq_commit_cb_print() be called\n"); } void build_consumer(SThreadInfo* pInfo) { @@ -353,7 +390,7 @@ void build_consumer(SThreadInfo* pInfo) { // tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); - tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, pInfo); // tmq_conf_set(conf, "group.id", "cgrp1"); for (int32_t i = 0; i < pInfo->numOfKey; i++) { @@ -392,9 +429,6 @@ void build_topic_list(SThreadInfo* pInfo) { int32_t saveConsumeResult(SThreadInfo* pInfo) { char sqlStr[1024] = {0}; - TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); - assert(pConn != NULL); - int64_t now = taosGetTimestampMs(); // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int @@ -404,7 +438,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { char tmpString[128]; taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr); - TAOS_RES* pRes = taos_query(pConn, sqlStr); + TAOS_RES* pRes = taos_query(pInfo->taos, sqlStr); if (taos_errno(pRes) != 0) { pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); @@ -413,38 +447,14 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { taos_free_result(pRes); -#if 0 - // vgroups - for (i = 0; i < pInfo->numOfVgroups; i++) { - // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int - sprintf(sqlStr, "insert into %s.vgroup_%d values (%"PRId64", %d, %" PRId64 ", %" PRId64 ", %d)", - g_stConfInfo.cdbName, - now, - pInfo->consumerId, - pInfo->consumeMsgCnt, - pInfo->consumeRowCnt, - pInfo->checkresult); - - char tmpString[128]; - taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId ,sqlStr); - - TAOS_RES* pRes = taos_query(pConn, sqlStr); - if (taos_errno(pRes) != 0) { - pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - exit(-1); - } - - taos_free_result(pRes); - } -#endif - return 0; } void loop_consume(SThreadInfo* pInfo) { int32_t code; + int32_t once_flag = 0; + int64_t totalMsgs = 0; int64_t totalRows = 0; @@ -465,6 +475,11 @@ void loop_consume(SThreadInfo* pInfo) { totalMsgs++; + if (0 == once_flag) { + once_flag = 1; + notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM); + } + if (totalRows >= pInfo->expectMsgCnt) { char tmpString[128]; taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString)); @@ -489,6 +504,12 @@ void* consumeThreadFunc(void* param) { SThreadInfo* pInfo = (SThreadInfo*)param; + pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0); + if (pInfo->taos == NULL) { + taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n"); + exit(-1); + } + build_consumer(pInfo); build_topic_list(pInfo); if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) { @@ -508,7 +529,6 @@ void* consumeThreadFunc(void* param) { loop_consume(pInfo); if (pInfo->ifManualCommit) { - taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n"); pPrint("tmq_commit() manual commit when consume end.\n"); /*tmq_commit(pInfo->tmq, NULL, 0);*/ tmq_commit_sync(pInfo->tmq, NULL); @@ -539,6 +559,9 @@ void* consumeThreadFunc(void* param) { taosFprintfFile(g_fp, "vgroups: %04d, rows: %d\n", pInfo->rowsOfPerVgroups[i][0], pInfo->rowsOfPerVgroups[i][1]); } + taos_close(pInfo->taos); + pInfo->taos = NULL; + return NULL; }