From 4511f5828fb3f5e650acc5405366df9611722cfc Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 12 May 2022 15:34:08 +0800 Subject: [PATCH 1/5] test: add test case for tmq --- tests/system-test/7-tmq/basic5.py | 85 ++- tests/system-test/fulltest.sh | 4 + tests/test/c/tmqSim.c | 1026 +++++++++++++++-------------- 3 files changed, 608 insertions(+), 507 deletions(-) diff --git a/tests/system-test/7-tmq/basic5.py b/tests/system-test/7-tmq/basic5.py index 99aa4e72aa..7ed9f7ebe7 100644 --- a/tests/system-test/7-tmq/basic5.py +++ b/tests/system-test/7-tmq/basic5.py @@ -61,7 +61,7 @@ class TDTestCase: tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) return - def insert_data(self,dbName,stbName,ctbNum,rowsPerTbl,startTs): + def insert_data(self,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs): tdLog.debug("start to insert data ............") tdSql.execute("use %s" %dbName) pre_insert = "insert into " @@ -72,13 +72,15 @@ class TDTestCase: sql += " %s_%d values "%(stbName,i) for j in range(rowsPerTbl): sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) - if (j > 0) and (j%2000 == 0): + if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): tdSql.execute(sql) - sql = "insert into %s_%d values " %(stbName,i) + if j < rowsPerTbl - 1: + sql = "insert into %s_%d values " %(stbName,i) + else: + sql = "insert into " #end sql if sql != pre_insert: - # print(sql) - print("sql:%s"%sql) + #print("insert sql:%s"%sql) tdSql.execute(sql) tdLog.debug("insert data ............ [OK]") return @@ -96,6 +98,7 @@ class TDTestCase: parameterDict["stbName"],\ parameterDict["ctbNum"],\ parameterDict["rowsPerTbl"],\ + parameterDict["batchNum"],\ parameterDict["startTs"]) return @@ -117,13 +120,81 @@ class TDTestCase: 'vgroups': 1, \ 'stbName': 'stb', \ 'ctbNum': 10, \ - 'rowsPerTbl': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 10, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() + time.sleep(1) + # wait stb ready + while 1: + tdSql.query("show %s.stables"%parameterDict['dbName']) + if tdSql.getRows() == 1: + #if (self.queryRows == 1): + time.sleep(1) + break + + tdLog.info("create topics from super table") + topicFromStb = 'topic_stb_column' + topicFromCtb = 'topic_ctb_column' + + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) + + tdSql.query("show topics") + tdSql.checkRows(2) + topic1 = tdSql.getData(0 , 0) + topic2 = tdSql.getData(1 , 0) + if topic1 != topicFromStb or topic1 != topicFromCtb: + tdLog.exit("topic error") + if topic2 != topicFromStb or topic2 != topicFromCtb: + tdLog.exit("topic error") + + tdLog.info("create consume info table and consume result table") + cdbName = 'cdb' + tdSql.query("create database %s"%cdbName) + tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)") + tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)") + + consumerId = 0 + expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] + 1) * parameterDict["ctbNum"] + topicList = topicFromStb + ifcheckdata = 0 + keyList = 'group.id:cgrp1, \ + enable.auto.commit:false, \ + auto.commit.interval.ms:6000, \ + auto.offset.reset:none' + sql = "insert into consumeinfo values " + sql += "(now, %d, '%s', '%s', %l64d, %d)"%(consumerId, topicList, keyList, expectmsgcnt, ifcheckdata) + tdSql.query(sql) + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s, -g %d, -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(taosCmd) + # wait for data ready - prepareEnvThread.join() + prepareEnvThread.join() + + tdLog.info("check consume result") + while 1: + tdSql.query("select * from consumeresult") + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 1: + #if (self.queryRows == 1): + time.sleep(1) + break + + tdSql.checkData(0 , 1, consumerId) + tdSql.checkData(0 , 2, expectmsgcnt) + tdSql.checkData(0 , 3, expectrowcnt) tdLog.printNoPrefix("======== test scenario 2: ") diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index a6b4408cdc..817f814873 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -51,3 +51,7 @@ python3 ./test.py -f 2-query/arcsin.py python3 ./test.py -f 2-query/arccos.py python3 ./test.py -f 2-query/arctan.py # python3 ./test.py -f 2-query/query_cols_tags_and_or.py + +python3 ./test.py -f 7-tmq/basic5.py + + diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 4a59d18d87..bc3aa091c3 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -1,500 +1,526 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include -#include -#include -#include -#include -#include -#include - -#include "taos.h" -#include "taoserror.h" -#include "tlog.h" - -#define GREEN "\033[1;32m" -#define NC "\033[0m" -#define min(a, b) (((a) < (b)) ? (a) : (b)) - -#define MAX_SQL_STR_LEN (1024 * 1024) -#define MAX_ROW_STR_LEN (16 * 1024) -#define MAX_CONSUMER_THREAD_CNT (16) - -typedef struct { - TdThread thread; - int32_t consumerId; - - int32_t ifCheckData; - int64_t expectMsgCnt; - - int64_t consumeMsgCnt; - int64_t consumeRowCnt; - int32_t checkresult; - - char topicString[1024]; - char keyString[1024]; - - int32_t numOfTopic; - char topics[32][64]; - - int32_t numOfKey; - char key[32][64]; - char value[32][64]; - - tmq_t* tmq; - tmq_list_t* topicList; - -} SThreadInfo; - -typedef struct { - // input from argvs - char cdbName[32]; - char dbName[32]; - int32_t showMsgFlag; - int32_t showRowFlag; - int32_t consumeDelay; // unit s - int32_t numOfThread; - SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT]; -} SConfInfo; - -static SConfInfo g_stConfInfo; -TdFilePtr g_fp = NULL; - -// char* g_pRowValue = NULL; -// TdFilePtr g_fp = NULL; - -static void printHelp() { - char indent[10] = " "; - printf("Used to test the tmq feature with sim cases\n"); - - printf("%s%s\n", indent, "-c"); - printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir); - printf("%s%s\n", indent, "-d"); - printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default "); - printf("%s%s\n", indent, "-g"); - printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag); - printf("%s%s\n", indent, "-r"); - printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag); - printf("%s%s\n", indent, "-y"); - printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay); - exit(EXIT_SUCCESS); -} - -void initLogFile() { - // FILE *fp = fopen(g_stConfInfo.resultFileName, "a"); - char file[256]; - sprintf(file, "%s/../log/tmqlog.txt", configDir); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); - if (NULL == pFile) { - fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt"); - exit(-1); - } - g_fp = pFile; -} - -void saveConfigToLogFile() { - time_t tTime = taosGetTimestampSec(); - struct tm tm = *taosLocalTime(&tTime, NULL); - - taosFprintfFile(g_fp, "###################################################################\n"); - taosFprintfFile(g_fp, "# configDir: %s\n", configDir); - taosFprintfFile(g_fp, "# dbName: %s\n", g_stConfInfo.dbName); - taosFprintfFile(g_fp, "# cdbName: %s\n", g_stConfInfo.cdbName); - taosFprintfFile(g_fp, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag); - taosFprintfFile(g_fp, "# showRowFlag: %d\n", g_stConfInfo.showRowFlag); - taosFprintfFile(g_fp, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay); - taosFprintfFile(g_fp, "# numOfThread: %d\n", g_stConfInfo.numOfThread); - - for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { - taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); - taosFprintfFile(g_fp, " Topics: "); - for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) { - taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]); - } - taosFprintfFile(g_fp, "\n"); - taosFprintfFile(g_fp, " Key: "); - for (int k = 0; k < g_stConfInfo.stThreads[i].numOfKey; k++) { - taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]); - } - taosFprintfFile(g_fp, "\n"); - } - - taosFprintfFile(g_fp, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1, - tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); - taosFprintfFile(g_fp, "###################################################################\n"); -} - -void parseArgument(int32_t argc, char* argv[]) { - memset(&g_stConfInfo, 0, sizeof(SConfInfo)); - g_stConfInfo.showMsgFlag = 0; - g_stConfInfo.showRowFlag = 0; - g_stConfInfo.consumeDelay = 5; - - for (int32_t i = 1; i < argc; i++) { - if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { - printHelp(); - exit(0); - } else if (strcmp(argv[i], "-d") == 0) { - strcpy(g_stConfInfo.dbName, argv[++i]); - } else if (strcmp(argv[i], "-w") == 0) { - strcpy(g_stConfInfo.cdbName, argv[++i]); - } else if (strcmp(argv[i], "-c") == 0) { - strcpy(configDir, argv[++i]); - } else if (strcmp(argv[i], "-g") == 0) { - g_stConfInfo.showMsgFlag = atol(argv[++i]); - } else if (strcmp(argv[i], "-r") == 0) { - g_stConfInfo.showRowFlag = atol(argv[++i]); - } else if (strcmp(argv[i], "-y") == 0) { - g_stConfInfo.consumeDelay = atol(argv[++i]); - } else { - printf("%s unknow para: %s %s", GREEN, argv[++i], NC); - exit(-1); - } - } - - initLogFile(); - - taosFprintfFile(g_fp, "====parseArgument() success\n"); - -#if 1 - pPrint("%s configDir:%s %s", GREEN, configDir, NC); - pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC); - pPrint("%s cdbName:%s %s", GREEN, g_stConfInfo.cdbName, NC); - pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC); - pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC); - pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC); -#endif -} - -void splitStr(char** arr, char* str, const char* del) { - char* s = strtok(str, del); - while (s != NULL) { - *arr++ = s; - s = strtok(NULL, del); - } -} - -void ltrim(char* str) { - if (str == NULL || *str == '\0') { - return; - } - int len = 0; - char* p = str; - while (*p != '\0' && isspace(*p)) { - ++p; - ++len; - } - memmove(str, p, strlen(str) - len + 1); - // return str; -} - -static int running = 1; -static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) { - char buf[1024]; - int32_t totalRows = 0; - - // printf("topic: %s\n", tmq_get_topic_name(msg)); - // printf("vg:%d\n", tmq_get_vgroup_id(msg)); - taosFprintfFile(g_fp, "msg index:%" PRId64 ", threadLable: %d\n", msgIndex, threadLable); - taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg)); - - while (1) { - TAOS_ROW row = taos_fetch_row(msg); - if (row == NULL) break; - if (0 != g_stConfInfo.showRowFlag) { - TAOS_FIELD* fields = taos_fetch_fields(msg); - int32_t numOfFields = taos_field_count(msg); - taos_print_row(buf, row, fields, numOfFields); - taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf); - } - totalRows++; - } - - return totalRows; -} - -int queryDB(TAOS* taos, char* command) { - TAOS_RES* pRes = taos_query(taos, command); - int code = taos_errno(pRes); - // if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) { - if (code != 0) { - pError("failed to reason:%s, sql: %s", tstrerror(code), command); - taos_free_result(pRes); - return -1; - } - taos_free_result(pRes); - return 0; -} - -static void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) { - printf("tmq_commit_cb_print() commit %d\n", resp); -} - -void build_consumer(SThreadInfo* pInfo) { - tmq_conf_t* conf = tmq_conf_new(); - - // tmq_conf_set(conf, "td.connect.ip", "localhost"); - // tmq_conf_set(conf, "td.connect.port", "6030"); - tmq_conf_set(conf, "td.connect.user", "root"); - tmq_conf_set(conf, "td.connect.pass", "taosdata"); - - tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); - - tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL); - - // tmq_conf_set(conf, "group.id", "cgrp1"); - for (int32_t i = 0; i < pInfo->numOfKey; i++) { - tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]); - } - - // tmq_conf_set(conf, "client.id", "c-001"); - - // tmq_conf_set(conf, "enable.auto.commit", "true"); - // tmq_conf_set(conf, "enable.auto.commit", "false"); - - // tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); - - // tmq_conf_set(conf, "auto.offset.reset", "none"); - // tmq_conf_set(conf, "auto.offset.reset", "earliest"); - // tmq_conf_set(conf, "auto.offset.reset", "latest"); - - pInfo->tmq = tmq_consumer_new(conf, NULL, 0); - return; -} - -void build_topic_list(SThreadInfo* pInfo) { - pInfo->topicList = tmq_list_new(); - // tmq_list_append(topic_list, "test_stb_topic_1"); - for (int32_t i = 0; i < pInfo->numOfTopic; i++) { - tmq_list_append(pInfo->topicList, pInfo->topics[i]); - } - return; -} - -int32_t saveConsumeResult(SThreadInfo* pInfo) { - char sqlStr[1024] = {0}; - - TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int - sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName, - pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult); - - TAOS_RES* pRes = taos_query(pConn, sqlStr); - if (taos_errno(pRes) != 0) { - printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - exit(-1); - } - - taos_free_result(pRes); - - return 0; -} - -void loop_consume(SThreadInfo* pInfo) { - tmq_resp_err_t err; - - int64_t totalMsgs = 0; - int64_t totalRows = 0; - - while (running) { - TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000); - if (tmqMsg) { - if (0 != g_stConfInfo.showMsgFlag) { - totalRows += msg_process(tmqMsg, totalMsgs, pInfo->consumerId); - } - - taos_free_result(tmqMsg); - - totalMsgs++; - - if (totalMsgs >= pInfo->expectMsgCnt) { - break; - } - } else { - break; - } - } - - pInfo->consumeMsgCnt = totalMsgs; - pInfo->consumeRowCnt = totalRows; - - taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n", - pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt); -} - -void* consumeThreadFunc(void* param) { - int32_t totalMsgs = 0; - - SThreadInfo* pInfo = (SThreadInfo*)param; - - build_consumer(pInfo); - build_topic_list(pInfo); - if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) { - return NULL; - } - - tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList); - if (err) { - printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); - exit(-1); - } - - loop_consume(pInfo); - - tmq_commit(pInfo->tmq, NULL, 0); - - err = tmq_unsubscribe(pInfo->tmq); - if (err) { - printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); - pInfo->consumeMsgCnt = -1; - return NULL; - } - - err = tmq_consumer_close(pInfo->tmq); - if (err) { - printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); - exit(-1); - } - pInfo->tmq = NULL; - - // save consume result into consumeresult table - saveConsumeResult(pInfo); - - return NULL; -} - -void parseConsumeInfo() { - char* token; - const char delim[2] = ","; - const char ch = ':'; - - for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { - token = strtok(g_stConfInfo.stThreads[i].topicString, delim); - while (token != NULL) { - // printf("%s\n", token ); - strcpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token); - ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]); - // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); - g_stConfInfo.stThreads[i].numOfTopic++; - - token = strtok(NULL, delim); - } - - token = strtok(g_stConfInfo.stThreads[i].keyString, delim); - while (token != NULL) { - // printf("%s\n", token ); - { - char* pstr = token; - ltrim(pstr); - char* ret = strchr(pstr, ch); - memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr); - strcpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1); - // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], - // g_stConfInfo.value[g_stConfInfo.numOfKey]); - g_stConfInfo.stThreads[i].numOfKey++; - } - - token = strtok(NULL, delim); - } - } -} - -int32_t getConsumeInfo() { - char sqlStr[1024] = {0}; - - TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName); - TAOS_RES* pRes = taos_query(pConn, sqlStr); - if (taos_errno(pRes) != 0) { - printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes)); - taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes)); - taosCloseFile(&g_fp); - taos_free_result(pRes); - exit(-1); - } - - TAOS_ROW row = NULL; - int num_fields = taos_num_fields(pRes); - TAOS_FIELD* fields = taos_fetch_fields(pRes); - - // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, - // ifcheckdata int - - int32_t numOfThread = 0; - while ((row = taos_fetch_row(pRes))) { - int32_t* lengths = taos_fetch_lengths(pRes); - - for (int i = 0; i < num_fields; ++i) { - if (row[i] == NULL || 0 == i) { - continue; - } - - if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { - g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]); - } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { - memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]); - } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { - memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]); - } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) { - g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]); - } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { - g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]); - } - } - numOfThread++; - } - g_stConfInfo.numOfThread = numOfThread; - - taos_free_result(pRes); - - parseConsumeInfo(); - - return 0; -} - -int main(int32_t argc, char* argv[]) { - parseArgument(argc, argv); - getConsumeInfo(); - saveConfigToLogFile(); - - TdThreadAttr thattr; - taosThreadAttrInit(&thattr); - taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); - - // pthread_create one thread to consume - taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread); - for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) { - taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, - (void*)(&(g_stConfInfo.stThreads[i]))); - } - - for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { - taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); - } - - // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); - - taosFprintfFile(g_fp, "==== close tmqlog ====\n"); - taosCloseFile(&g_fp); - - return 0; -} - +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "taos.h" +#include "taoserror.h" +#include "tlog.h" + +#define GREEN "\033[1;32m" +#define NC "\033[0m" +#define min(a, b) (((a) < (b)) ? (a) : (b)) + +#define MAX_SQL_STR_LEN (1024 * 1024) +#define MAX_ROW_STR_LEN (16 * 1024) +#define MAX_CONSUMER_THREAD_CNT (16) + +typedef struct { + TdThread thread; + int32_t consumerId; + + int32_t autoCommitIntervalMs; // 1000 ms + char autoCommit[8]; // true, false + char autoOffsetRest[16]; // none, earliest, latest + + int32_t ifCheckData; + int64_t expectMsgCnt; + + int64_t consumeMsgCnt; + int64_t consumeRowCnt; + int32_t checkresult; + + char topicString[1024]; + char keyString[1024]; + + int32_t numOfTopic; + char topics[32][64]; + + int32_t numOfKey; + char key[32][64]; + char value[32][64]; + + tmq_t* tmq; + tmq_list_t* topicList; + +} SThreadInfo; + +typedef struct { + // input from argvs + char cdbName[32]; + char dbName[32]; + int32_t showMsgFlag; + int32_t showRowFlag; + int32_t consumeDelay; // unit s + int32_t numOfThread; + SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT]; +} SConfInfo; + +static SConfInfo g_stConfInfo; +TdFilePtr g_fp = NULL; + +// char* g_pRowValue = NULL; +// TdFilePtr g_fp = NULL; + +static void printHelp() { + char indent[10] = " "; + printf("Used to test the tmq feature with sim cases\n"); + + printf("%s%s\n", indent, "-c"); + printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir); + printf("%s%s\n", indent, "-d"); + printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default "); + printf("%s%s\n", indent, "-g"); + printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag); + printf("%s%s\n", indent, "-r"); + printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag); + printf("%s%s\n", indent, "-y"); + printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay); + exit(EXIT_SUCCESS); +} + +void initLogFile() { + // FILE *fp = fopen(g_stConfInfo.resultFileName, "a"); + char file[256]; + sprintf(file, "%s/../log/tmqlog.txt", configDir); + TdFilePtr pFile = taosOpenFile(file, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); + if (NULL == pFile) { + fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt"); + exit(-1); + } + g_fp = pFile; +} + +void saveConfigToLogFile() { + time_t tTime = taosGetTimestampSec(); + struct tm tm = *taosLocalTime(&tTime, NULL); + + taosFprintfFile(g_fp, "###################################################################\n"); + taosFprintfFile(g_fp, "# configDir: %s\n", configDir); + taosFprintfFile(g_fp, "# dbName: %s\n", g_stConfInfo.dbName); + taosFprintfFile(g_fp, "# cdbName: %s\n", g_stConfInfo.cdbName); + taosFprintfFile(g_fp, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag); + taosFprintfFile(g_fp, "# showRowFlag: %d\n", g_stConfInfo.showRowFlag); + taosFprintfFile(g_fp, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay); + taosFprintfFile(g_fp, "# numOfThread: %d\n", g_stConfInfo.numOfThread); + + for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { + taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); + taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit); + taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs); + taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest); + taosFprintfFile(g_fp, " Topics: "); + for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) { + taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]); + } + taosFprintfFile(g_fp, "\n"); + taosFprintfFile(g_fp, " Key: "); + for (int k = 0; k < g_stConfInfo.stThreads[i].numOfKey; k++) { + taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]); + } + taosFprintfFile(g_fp, "\n"); + } + + taosFprintfFile(g_fp, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1, + tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); + taosFprintfFile(g_fp, "###################################################################\n"); +} + +void parseArgument(int32_t argc, char* argv[]) { + memset(&g_stConfInfo, 0, sizeof(SConfInfo)); + g_stConfInfo.showMsgFlag = 0; + g_stConfInfo.showRowFlag = 0; + g_stConfInfo.consumeDelay = 5; + + for (int32_t i = 1; i < argc; i++) { + if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { + printHelp(); + exit(0); + } else if (strcmp(argv[i], "-d") == 0) { + strcpy(g_stConfInfo.dbName, argv[++i]); + } else if (strcmp(argv[i], "-w") == 0) { + strcpy(g_stConfInfo.cdbName, argv[++i]); + } else if (strcmp(argv[i], "-c") == 0) { + strcpy(configDir, argv[++i]); + } else if (strcmp(argv[i], "-g") == 0) { + g_stConfInfo.showMsgFlag = atol(argv[++i]); + } else if (strcmp(argv[i], "-r") == 0) { + g_stConfInfo.showRowFlag = atol(argv[++i]); + } else if (strcmp(argv[i], "-y") == 0) { + g_stConfInfo.consumeDelay = atol(argv[++i]); + } else { + printf("%s unknow para: %s %s", GREEN, argv[++i], NC); + exit(-1); + } + } + + initLogFile(); + + taosFprintfFile(g_fp, "====parseArgument() success\n"); + +#if 1 + pPrint("%s configDir:%s %s", GREEN, configDir, NC); + pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC); + pPrint("%s cdbName:%s %s", GREEN, g_stConfInfo.cdbName, NC); + pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC); + pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC); + pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC); +#endif +} + +void splitStr(char** arr, char* str, const char* del) { + char* s = strtok(str, del); + while (s != NULL) { + *arr++ = s; + s = strtok(NULL, del); + } +} + +void ltrim(char* str) { + if (str == NULL || *str == '\0') { + return; + } + int len = 0; + char* p = str; + while (*p != '\0' && isspace(*p)) { + ++p; + ++len; + } + memmove(str, p, strlen(str) - len + 1); + // return str; +} + +static int running = 1; +static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) { + char buf[1024]; + int32_t totalRows = 0; + + // printf("topic: %s\n", tmq_get_topic_name(msg)); + // printf("vg:%d\n", tmq_get_vgroup_id(msg)); + taosFprintfFile(g_fp, "msg index:%" PRId64 ", threadLable: %d\n", msgIndex, threadLable); + taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg)); + + while (1) { + TAOS_ROW row = taos_fetch_row(msg); + if (row == NULL) break; + if (0 != g_stConfInfo.showRowFlag) { + TAOS_FIELD* fields = taos_fetch_fields(msg); + int32_t numOfFields = taos_field_count(msg); + taos_print_row(buf, row, fields, numOfFields); + taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf); + } + totalRows++; + } + + return totalRows; +} + +int queryDB(TAOS* taos, char* command) { + TAOS_RES* pRes = taos_query(taos, command); + int code = taos_errno(pRes); + // if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) { + if (code != 0) { + pError("failed to reason:%s, sql: %s", tstrerror(code), command); + taos_free_result(pRes); + return -1; + } + taos_free_result(pRes); + return 0; +} + +static void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) { + printf("tmq_commit_cb_print() commit %d\n", resp); +} + +void build_consumer(SThreadInfo* pInfo) { + tmq_conf_t* conf = tmq_conf_new(); + + // tmq_conf_set(conf, "td.connect.ip", "localhost"); + // tmq_conf_set(conf, "td.connect.port", "6030"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + + //tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); + + tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL); + + // tmq_conf_set(conf, "group.id", "cgrp1"); + for (int32_t i = 0; i < pInfo->numOfKey; i++) { + tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]); + } + + // tmq_conf_set(conf, "client.id", "c-001"); + + // tmq_conf_set(conf, "enable.auto.commit", "true"); + // tmq_conf_set(conf, "enable.auto.commit", "false"); + + // tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); + + // tmq_conf_set(conf, "auto.offset.reset", "none"); + // tmq_conf_set(conf, "auto.offset.reset", "earliest"); + // tmq_conf_set(conf, "auto.offset.reset", "latest"); + + pInfo->tmq = tmq_consumer_new(conf, NULL, 0); + + tmq_conf_destroy(conf); + + return; +} + +void build_topic_list(SThreadInfo* pInfo) { + pInfo->topicList = tmq_list_new(); + // tmq_list_append(topic_list, "test_stb_topic_1"); + for (int32_t i = 0; i < pInfo->numOfTopic; i++) { + tmq_list_append(pInfo->topicList, pInfo->topics[i]); + } + return; +} + +int32_t saveConsumeResult(SThreadInfo* pInfo) { + char sqlStr[1024] = {0}; + + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int + sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName, + pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult); + + TAOS_RES* pRes = taos_query(pConn, sqlStr); + if (taos_errno(pRes) != 0) { + printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + exit(-1); + } + + taos_free_result(pRes); + + return 0; +} + +void loop_consume(SThreadInfo* pInfo) { + tmq_resp_err_t err; + + int64_t totalMsgs = 0; + int64_t totalRows = 0; + + while (running) { + TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000); + if (tmqMsg) { + if (0 != g_stConfInfo.showMsgFlag) { + totalRows += msg_process(tmqMsg, totalMsgs, pInfo->consumerId); + } + + taos_free_result(tmqMsg); + + totalMsgs++; + + if (totalMsgs >= pInfo->expectMsgCnt) { + taosFprintfFile(g_fp, "==== totalMsgs >= pInfo->expectMsgCnt, so break\n"); + break; + } + } else { + taosFprintfFile(g_fp, "==== delay over time, so break\n"); + break; + } + } + + pInfo->consumeMsgCnt = totalMsgs; + pInfo->consumeRowCnt = totalRows; + + taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n", + pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt); +} + +void* consumeThreadFunc(void* param) { + int32_t totalMsgs = 0; + + SThreadInfo* pInfo = (SThreadInfo*)param; + + build_consumer(pInfo); + build_topic_list(pInfo); + if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) { + return NULL; + } + + tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList); + if (err) { + printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); + exit(-1); + } + + tmq_list_destroy(pInfo->topicList); + pInfo->topicList = NULL; + + loop_consume(pInfo); + + tmq_commit(pInfo->tmq, NULL, 0); + + err = tmq_unsubscribe(pInfo->tmq); + if (err) { + printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); + pInfo->consumeMsgCnt = -1; + return NULL; + } + + err = tmq_consumer_close(pInfo->tmq); + if (err) { + printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); + exit(-1); + } + pInfo->tmq = NULL; + + // save consume result into consumeresult table + saveConsumeResult(pInfo); + + return NULL; +} + +void parseConsumeInfo() { + char* token; + const char delim[2] = ","; + const char ch = ':'; + + for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { + token = strtok(g_stConfInfo.stThreads[i].topicString, delim); + while (token != NULL) { + // printf("%s\n", token ); + strcpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token); + ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]); + // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); + g_stConfInfo.stThreads[i].numOfTopic++; + + token = strtok(NULL, delim); + } + + token = strtok(g_stConfInfo.stThreads[i].keyString, delim); + while (token != NULL) { + // printf("%s\n", token ); + { + char* pstr = token; + ltrim(pstr); + char* ret = strchr(pstr, ch); + memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr); + strcpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1); + // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], + // g_stConfInfo.value[g_stConfInfo.numOfKey]); + g_stConfInfo.stThreads[i].numOfKey++; + } + + token = strtok(NULL, delim); + } + } +} + +int32_t getConsumeInfo() { + char sqlStr[1024] = {0}; + + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName); + TAOS_RES* pRes = taos_query(pConn, sqlStr); + if (taos_errno(pRes) != 0) { + printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes)); + taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes)); + taosCloseFile(&g_fp); + taos_free_result(pRes); + exit(-1); + } + + TAOS_ROW row = NULL; + int num_fields = taos_num_fields(pRes); + TAOS_FIELD* fields = taos_fetch_fields(pRes); + + // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, + // ifcheckdata int + + int32_t numOfThread = 0; + while ((row = taos_fetch_row(pRes))) { + int32_t* lengths = taos_fetch_lengths(pRes); + + // set default value + g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000; + memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true")); + memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast")); + + for (int i = 0; i < num_fields; ++i) { + if (row[i] == NULL || 0 == i) { + continue; + } + + if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { + g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]); + } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { + memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]); + } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { + memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]); + } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) { + g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]); + } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { + g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]); + } else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { + memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, row[i], lengths[i]); + } else if ((7 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { + g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = *((int32_t*)row[i]); + } else if ((8 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { + memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, row[i], lengths[i]); + } + } + numOfThread++; + } + g_stConfInfo.numOfThread = numOfThread; + + taos_free_result(pRes); + + parseConsumeInfo(); + + return 0; +} + +int main(int32_t argc, char* argv[]) { + parseArgument(argc, argv); + getConsumeInfo(); + saveConfigToLogFile(); + + TdThreadAttr thattr; + taosThreadAttrInit(&thattr); + taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); + + // pthread_create one thread to consume + taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread); + for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) { + taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, + (void*)(&(g_stConfInfo.stThreads[i]))); + } + + for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { + taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); + } + + // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); + + taosFprintfFile(g_fp, "==== close tmqlog ====\n"); + taosCloseFile(&g_fp); + + return 0; +} + From 3309dd71db1f3152e64eb83095b1a6a737484147 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 12 May 2022 15:36:30 +0800 Subject: [PATCH 2/5] test: add test case for tmq --- tests/system-test/7-tmq/basic5.py | 143 +++++++++++++++++++----------- 1 file changed, 91 insertions(+), 52 deletions(-) diff --git a/tests/system-test/7-tmq/basic5.py b/tests/system-test/7-tmq/basic5.py index 7ed9f7ebe7..267b6ff5d8 100644 --- a/tests/system-test/7-tmq/basic5.py +++ b/tests/system-test/7-tmq/basic5.py @@ -13,14 +13,12 @@ from util.dnodes import * class TDTestCase: hostname = socket.gethostname() - rpcDebugFlagVal = '143' - clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} - clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal - - updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} - updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal - - print ("===================: ", updatecfgDict) + #rpcDebugFlagVal = '143' + #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #print ("===================: ", updatecfgDict) def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") @@ -43,27 +41,35 @@ class TDTestCase: break return buildPath - def create_tables(self,dbName,vgroups,stbName,ctbNum,rowsPerTbl): - tdSql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) - tdSql.execute("use %s" %dbName) - tdSql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) + def newcur(self,cfg,host,port): + user = "root" + password = "taosdata" + con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) + cur=con.cursor() + print(cur) + return cur + + def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("use %s" %dbName) + tsql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) pre_create = "create table" sql = pre_create #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) for i in range(ctbNum): sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1) if (i > 0) and (i%100 == 0): - tdSql.execute(sql) + tsql.execute(sql) sql = pre_create if sql != pre_create: - tdSql.execute(sql) + tsql.execute(sql) tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) return - def insert_data(self,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs): + def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs): tdLog.debug("start to insert data ............") - tdSql.execute("use %s" %dbName) + tsql.execute("use %s" %dbName) pre_insert = "insert into " sql = pre_insert @@ -73,7 +79,7 @@ class TDTestCase: for j in range(rowsPerTbl): sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): - tdSql.execute(sql) + tsql.execute(sql) if j < rowsPerTbl - 1: sql = "insert into %s_%d values " %(stbName,i) else: @@ -81,25 +87,29 @@ class TDTestCase: #end sql if sql != pre_insert: #print("insert sql:%s"%sql) - tdSql.execute(sql) + tsql.execute(sql) tdLog.debug("insert data ............ [OK]") return def prepareEnv(self, **parameterDict): print ("input parameters:") print (parameterDict) - self.create_tables(parameterDict["dbName"],\ + # create new connector for my thread + tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) + self.create_tables(tsql,\ + parameterDict["dbName"],\ parameterDict["vgroups"],\ parameterDict["stbName"],\ parameterDict["ctbNum"],\ parameterDict["rowsPerTbl"]) - self.insert_data(parameterDict["dbName"],\ - parameterDict["stbName"],\ - parameterDict["ctbNum"],\ - parameterDict["rowsPerTbl"],\ - parameterDict["batchNum"],\ - parameterDict["startTs"]) + self.insert_data(tsql,\ + parameterDict["dbName"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"],\ + parameterDict["batchNum"],\ + parameterDict["startTs"]) return def run(self): @@ -116,24 +126,29 @@ class TDTestCase: tdLog.printNoPrefix("======== test scenario 1: ") tdLog.info("step 1: create database, stb, ctb and insert data") # create and start thread - parameterDict = {'dbName': 'db', \ + parameterDict = {'cfg': '', \ + 'dbName': 'db', \ 'vgroups': 1, \ 'stbName': 'stb', \ 'ctbNum': 10, \ - 'rowsPerTbl': 10000, \ + 'rowsPerTbl': 10000, \ 'batchNum': 10, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - time.sleep(1) + time.sleep(2) # wait stb ready while 1: - tdSql.query("show %s.stables"%parameterDict['dbName']) - if tdSql.getRows() == 1: - #if (self.queryRows == 1): - time.sleep(1) + #tdSql.query("show %s.stables"%parameterDict['dbName']) + tdSql.query("show db.stables") + #print (self.queryResult) + #print (tdSql.getRows()) + if tdSql.getRows() == 1: break + else: + time.sleep(1) tdLog.info("create topics from super table") topicFromStb = 'topic_stb_column' @@ -141,57 +156,81 @@ class TDTestCase: tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) - + + time.sleep(1) tdSql.query("show topics") - tdSql.checkRows(2) + print ("======================================") + #print (self.queryResult) + #tdSql.checkRows(2) topic1 = tdSql.getData(0 , 0) topic2 = tdSql.getData(1 , 0) - if topic1 != topicFromStb or topic1 != topicFromCtb: - tdLog.exit("topic error") - if topic2 != topicFromStb or topic2 != topicFromCtb: - tdLog.exit("topic error") + print (topic1) + print (topic2) + + print (topicFromStb) + print (topicFromCtb) + #tdLog.info("show topics: %s, %s"%topic1, topic2) + #if topic1 != topicFromStb or topic1 != topicFromCtb: + # tdLog.exit("topic error1") + #if topic2 != topicFromStb or topic2 != topicFromCtb: + # tdLog.exit("topic error2") tdLog.info("create consume info table and consume result table") - cdbName = 'cdb' - tdSql.query("create database %s"%cdbName) + cdbName = parameterDict["dbName"] + #tdSql.query("create database %s"%cdbName) + #tdSql.query("use %s"%cdbName) tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)") tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)") consumerId = 0 - expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] + 1) * parameterDict["ctbNum"] + expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"] + expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"] topicList = topicFromStb ifcheckdata = 0 - keyList = 'group.id:cgrp1, \ - enable.auto.commit:false, \ - auto.commit.interval.ms:6000, \ - auto.offset.reset:none' + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' sql = "insert into consumeinfo values " - sql += "(now, %d, '%s', '%s', %l64d, %d)"%(consumerId, topicList, keyList, expectmsgcnt, ifcheckdata) + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata) tdSql.query(sql) + tdLog.info("check stb if there are data") + while 1: + tdSql.query("select count(*) from %s"%parameterDict["stbName"]) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + countOfStb = tdSql.getData(0, 0) + if countOfStb != 0: + tdLog.info("count from stb: %d"%countOfStb) + break + else: + time.sleep(1) + tdLog.info("start consume processor") pollDelay = 5 showMsg = 1 showRow = 1 shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s, -g %d, -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) shellCmd += "> /dev/null 2>&1 &" tdLog.info(shellCmd) - os.system(taosCmd) + os.system(shellCmd) # wait for data ready prepareEnvThread.join() - tdLog.info("check consume result") + tdLog.info("insert process end, and start to check consume result") while 1: tdSql.query("select * from consumeresult") #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) if tdSql.getRows() == 1: - #if (self.queryRows == 1): - time.sleep(1) break - + else: + time.sleep(5) + + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + tdSql.checkData(0 , 1, consumerId) tdSql.checkData(0 , 2, expectmsgcnt) tdSql.checkData(0 , 3, expectrowcnt) From bc4cd5ef3da28effd8ceec8b025292a8ba2970d1 Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Thu, 12 May 2022 16:21:31 +0800 Subject: [PATCH 3/5] test: add ts filter testcases --- .../2-query/query_cols_tags_and_or.py | 118 ++++++++++-------- 1 file changed, 69 insertions(+), 49 deletions(-) diff --git a/tests/system-test/2-query/query_cols_tags_and_or.py b/tests/system-test/2-query/query_cols_tags_and_or.py index a62960cf43..77e91aa983 100644 --- a/tests/system-test/2-query/query_cols_tags_and_or.py +++ b/tests/system-test/2-query/query_cols_tags_and_or.py @@ -93,104 +93,124 @@ class TDTestCase: res = tdSql.query(query_sql.replace('*', 'last(*)'), True) return int(res[0][-4]) - def queryTsCol(self, tb_name): + def queryTsCol(self, tb_name, check_elm=None): + select_elm = "*" if check_elm is None else check_elm # ts and ts - query_sql = f'select * from {tb_name} where ts > "2021-01-11 12:00:00" or ts < "2021-01-13 12:00:00"' + query_sql = f'select {select_elm} from {tb_name} where ts > "2021-01-11 12:00:00" or ts < "2021-01-13 12:00:00"' tdSql.query(query_sql) tdSql.checkRows(11) - tdSql.checkEqual(self.queryLastC10(query_sql), 11) + tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False - query_sql = f'select * from {tb_name} where ts >= "2021-01-11 12:00:00" and ts <= "2021-01-13 12:00:00"' + query_sql = f'select {select_elm} from {tb_name} where ts >= "2021-01-11 12:00:00" and ts <= "2021-01-13 12:00:00"' tdSql.query(query_sql) - # tdSql.checkRows(2) - # tdSql.checkEqual(self.queryLastC10(query_sql), 6) + tdSql.checkRows(2) + tdSql.checkEqual(self.queryLastC10(query_sql), 6) if select_elm == "*" else False ## ts or and tinyint col - query_sql = f'select * from {tb_name} where ts > "2021-01-11 12:00:00" or c1 = 2' - tdSql.error(query_sql) + query_sql = f'select {select_elm} from {tb_name} where ts > "2021-01-11 12:00:00" or c1 = 2' + tdSql.query(query_sql) + tdSql.checkRows(7) + tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False - query_sql = f'select * from {tb_name} where ts <= "2021-01-11 12:00:00" and c1 != 2' + query_sql = f'select {select_elm} from {tb_name} where ts <= "2021-01-11 12:00:00" and c1 != 2' tdSql.query(query_sql) tdSql.checkRows(4) - tdSql.checkEqual(self.queryLastC10(query_sql), 5) + tdSql.checkEqual(self.queryLastC10(query_sql), 5) if select_elm == "*" else False ## ts or and smallint col - query_sql = f'select * from {tb_name} where ts <> "2021-01-11 12:00:00" or c2 = 10' - tdSql.error(query_sql) + query_sql = f'select {select_elm} from {tb_name} where ts <> "2021-01-11 12:00:00" or c2 = 10' + tdSql.query(query_sql) + tdSql.checkRows(10) + tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False - query_sql = f'select * from {tb_name} where ts <= "2021-01-11 12:00:00" and c2 <= 1' + + query_sql = f'select {select_elm} from {tb_name} where ts <= "2021-01-11 12:00:00" and c2 <= 1' tdSql.query(query_sql) tdSql.checkRows(1) - tdSql.checkEqual(self.queryLastC10(query_sql), 1) + tdSql.checkEqual(self.queryLastC10(query_sql), 1) if select_elm == "*" else False ## ts or and int col - query_sql = f'select * from {tb_name} where ts >= "2021-01-11 12:00:00" or c3 = 4' - tdSql.error(query_sql) + query_sql = f'select {select_elm} from {tb_name} where ts >= "2021-01-11 12:00:00" or c3 = 4' + tdSql.query(query_sql) + tdSql.checkRows(8) + tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False - query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" and c3 = 4' + query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" and c3 = 4' tdSql.query(query_sql) tdSql.checkRows(1) - tdSql.checkEqual(self.queryLastC10(query_sql), 4) + tdSql.checkEqual(self.queryLastC10(query_sql), 4) if select_elm == "*" else False ## ts or and big col - query_sql = f'select * from {tb_name} where ts is Null or c4 = 5' - tdSql.error(query_sql) - - query_sql = f'select * from {tb_name} where ts is not Null and c4 = 2' + query_sql = f'select {select_elm} from {tb_name} where ts is Null or c4 = 5' tdSql.query(query_sql) tdSql.checkRows(1) - tdSql.checkEqual(self.queryLastC10(query_sql), 3) + tdSql.checkEqual(self.queryLastC10(query_sql), 5) if select_elm == "*" else False + + query_sql = f'select {select_elm} from {tb_name} where ts is not Null and c4 = 2' + tdSql.query(query_sql) + tdSql.checkRows(1) + tdSql.checkEqual(self.queryLastC10(query_sql), 3) if select_elm == "*" else False ## ts or and float col - query_sql = f'select * from {tb_name} where ts between "2021-01-17 12:00:00" and "2021-01-23 12:00:00" or c5 = 6.6' - tdSql.error(query_sql) + query_sql = f'select {select_elm} from {tb_name} where ts between "2021-01-17 12:00:00" and "2021-01-23 12:00:00" or c5 = 6.6' + tdSql.query(query_sql) + tdSql.checkRows(5) + tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False - query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" and c5 = 1.1' + query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" and c5 = 1.1' tdSql.query(query_sql) tdSql.checkRows(4) - tdSql.checkEqual(self.queryLastC10(query_sql), 4) + tdSql.checkEqual(self.queryLastC10(query_sql), 4) if select_elm == "*" else False ## ts or and double col - query_sql = f'select * from {tb_name} where ts between "2021-01-17 12:00:00" and "2021-01-23 12:00:00" or c6 = 7.7' - tdSql.error(query_sql) + query_sql = f'select {select_elm} from {tb_name} where ts between "2021-01-17 12:00:00" and "2021-01-23 12:00:00" or c6 = 7.7' + tdSql.query(query_sql) + tdSql.checkRows(5) + tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False - query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" and c6 = 1.1' + query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" and c6 = 1.1' tdSql.query(query_sql) tdSql.checkRows(4) - tdSql.checkEqual(self.queryLastC10(query_sql), 4) + tdSql.checkEqual(self.queryLastC10(query_sql), 4) if select_elm == "*" else False ## ts or and binary col - query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" or c7 like "binary_"' - tdSql.error(query_sql) - - query_sql = f'select * from {tb_name} where ts <= "2021-01-11 12:00:00" and c7 in ("binary")' + query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" or c7 like "binary_"' tdSql.query(query_sql) tdSql.checkRows(5) - tdSql.checkEqual(self.queryLastC10(query_sql), 5) + tdSql.checkEqual(self.queryLastC10(query_sql), 8) if select_elm == "*" else False + + query_sql = f'select {select_elm} from {tb_name} where ts <= "2021-01-11 12:00:00" and c7 in ("binary")' + tdSql.query(query_sql) + tdSql.checkRows(5) + tdSql.checkEqual(self.queryLastC10(query_sql), 5) if select_elm == "*" else False ## ts or and nchar col - query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" or c8 like "nchar%"' - tdSql.error(query_sql) + query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" or c8 like "nchar%"' + tdSql.query(query_sql) + tdSql.checkRows(10) + tdSql.checkEqual(self.queryLastC10(query_sql), 10) if select_elm == "*" else False - query_sql = f'select * from {tb_name} where ts >= "2021-01-11 12:00:00" and c8 is Null' + query_sql = f'select {select_elm} from {tb_name} where ts >= "2021-01-11 12:00:00" and c8 is Null' tdSql.query(query_sql) tdSql.checkRows(1) - tdSql.checkEqual(self.queryLastC10(query_sql), 11) + tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False ## ts or and bool col - query_sql = f'select * from {tb_name} where ts < "2021-01-11 12:00:00" or c9=false' - tdSql.error(query_sql) + query_sql = f'select {select_elm} from {tb_name} where ts < "2021-01-11 12:00:00" or c9=false' + tdSql.query(query_sql) + tdSql.checkRows(6) + tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False - query_sql = f'select * from {tb_name} where ts >= "2021-01-11 12:00:00" and c9=true' + query_sql = f'select {select_elm} from {tb_name} where ts >= "2021-01-11 12:00:00" and c9=true' tdSql.query(query_sql) tdSql.checkRows(5) - tdSql.checkEqual(self.queryLastC10(query_sql), 9) + tdSql.checkEqual(self.queryLastC10(query_sql), 9) if select_elm == "*" else False ## multi cols - query_sql = f'select * from {tb_name} where ts > "2021-01-03 12:00:00" and c1 != 2 and c2 >= 2 and c3 <> 4 and c4 < 4 and c5 > 1 and c6 >= 1.1 and c7 is not Null and c8 = "nchar" and c9=false' + query_sql = f'select {select_elm} from {tb_name} where ts > "2021-01-03 12:00:00" and c1 != 2 and c2 >= 2 and c3 <> 4 and c4 < 4 and c5 > 1 and c6 >= 1.1 and c7 is not Null and c8 = "nchar" and c9=false' tdSql.query(query_sql) tdSql.checkRows(1) - tdSql.checkEqual(self.queryLastC10(query_sql), 10) + tdSql.checkEqual(self.queryLastC10(query_sql), 10) if select_elm == "*" else False def queryTsTag(self, tb_name): ## ts and tinyint col @@ -2029,12 +2049,12 @@ class TDTestCase: tb_name = self.initStb() self.queryFullTagType(tb_name) - def checkTbTsCol(self): + def checkTbTsCol(self, check_elm): ''' Ordinary table ts and col check ''' tb_name = self.initTb() - self.queryTsCol(tb_name) + self.queryTsCol(tb_name, check_elm) def checkStbTsTol(self): tb_name = self.initStb() @@ -2112,8 +2132,8 @@ class TDTestCase: for check_elm in [None, column_name]: self.checkTbColTypeOperator(check_elm) self.checkStbColTypeOperator(check_elm) + self.checkTbTsCol(check_elm) # self.checkStbTagTypeOperator() - # self.checkTbTsCol() # self.checkStbTsTol() # self.checkStbTsTag() # self.checkStbTsColTag() From 89271550133c6e0ae50feade6c686ffa200b8033 Mon Sep 17 00:00:00 2001 From: "slzhou@taodata.com" Date: Thu, 12 May 2022 16:49:31 +0800 Subject: [PATCH 4/5] feat:add udf dedicated errors --- include/util/taoserror.h | 1 + source/common/src/tglobal.c | 4 +- source/libs/function/src/tudf.c | 2 +- source/libs/function/src/udfd.c | 338 ++++++++++++++++--------------- source/libs/function/test/udf2.c | 6 + source/util/src/terror.c | 1 + 6 files changed, 190 insertions(+), 162 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 58cdd2cab4..238600160a 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -654,6 +654,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_UDF_PIPE_NO_PIPE TAOS_DEF_ERROR_CODE(0, 0x2904) #define TSDB_CODE_UDF_LOAD_UDF_FAILURE TAOS_DEF_ERROR_CODE(0, 0x2905) #define TSDB_CODE_UDF_INVALID_STATE TAOS_DEF_ERROR_CODE(0, 0x2906) +#define TSDB_CODE_UDF_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2907) #define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000) #define TSDB_CODE_SML_INVALID_PRECISION_TYPE TAOS_DEF_ERROR_CODE(0, 0x3001) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f73a982110..42c8e18f3e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -444,7 +444,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1; - if (cfgAddBool(pCfg, "startUdfd", tsStartUdfd, 0) != 0) return -1; + if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; return 0; } @@ -585,7 +585,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32; - tsStartUdfd = cfgGetItem(pCfg, "startUdfd")->bval; + tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index dfa7fac15a..8e96a2a063 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1497,7 +1497,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { taosArrayDestroy(tempBlock.pDataBlock); taosMemoryFree(newState.buf); - return TSDB_CODE_SUCCESS; + return udfCode; } int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index f006695f14..34681dc6cd 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -140,6 +140,182 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { return 0; } +void udfdProcessSetupRequest(SUvUdfWork* uvUdf, SUdfRequest* request) { + // TODO: tracable id from client. connect, setup, call, teardown + fnInfo("%" PRId64 " setup request. udf name: %s", request->seqNum, request->setup.udfName); + SUdfSetupRequest *setup = &request->setup; + int32_t code = TSDB_CODE_SUCCESS; + SUdf *udf = NULL; + uv_mutex_lock(&global.udfsMutex); + SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName)); + if (udfInHash) { + ++(*udfInHash)->refCount; + udf = *udfInHash; + uv_mutex_unlock(&global.udfsMutex); + } else { + SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); + udfNew->refCount = 1; + udfNew->state = UDF_STATE_INIT; + + uv_mutex_init(&udfNew->lock); + uv_cond_init(&udfNew->condReady); + udf = udfNew; + taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew)); + uv_mutex_unlock(&global.udfsMutex); + } + + uv_mutex_lock(&udf->lock); + if (udf->state == UDF_STATE_INIT) { + udf->state = UDF_STATE_LOADING; + code = udfdLoadUdf(setup->udfName, udf); + if (udf->initFunc) { + udf->initFunc(); + } + udf->state = UDF_STATE_READY; + uv_cond_broadcast(&udf->condReady); + uv_mutex_unlock(&udf->lock); + } else { + while (udf->state != UDF_STATE_READY) { + uv_cond_wait(&udf->condReady, &udf->lock); + } + uv_mutex_unlock(&udf->lock); + } + SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle)); + handle->udf = udf; + + SUdfResponse rsp; + rsp.seqNum = request->seqNum; + rsp.type = request->type; + rsp.code = code; + rsp.setupRsp.udfHandle = (int64_t)(handle); + rsp.setupRsp.outputType = udf->outputType; + rsp.setupRsp.outputLen = udf->outputLen; + rsp.setupRsp.bufSize = udf->bufSize; + + int32_t len = encodeUdfResponse(NULL, &rsp); + rsp.msgLen = len; + void *bufBegin = taosMemoryMalloc(len); + void *buf = bufBegin; + encodeUdfResponse(&buf, &rsp); + + uvUdf->output = uv_buf_init(bufBegin, len); + + taosMemoryFree(uvUdf->input.base); + return; +} + +void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { + SUdfCallRequest *call = &request->call; + fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType, + call->udfHandle); + SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); + SUdf *udf = handle->udf; + SUdfResponse response = {0}; + SUdfResponse *rsp = &response; + SUdfCallResponse *subRsp = &rsp->callRsp; + + int32_t code = TSDB_CODE_SUCCESS; + switch(call->callType) { + case TSDB_UDF_CALL_SCALA_PROC: { + SUdfColumn output = {0}; + + SUdfDataBlock input = {0}; + convertDataBlockToUdfDataBlock(&call->block, &input); + code = udf->scalarProcFunc(&input, &output); + + convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); + freeUdfColumn(&output); + break; + } + case TSDB_UDF_CALL_AGG_INIT: { + SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), + .bufLen= udf->bufSize, + .numOfResult = 0}; + udf->aggStartFunc(&outBuf); + subRsp->resultBuf = outBuf; + break; + } + case TSDB_UDF_CALL_AGG_PROC: { + SUdfDataBlock input = {0}; + convertDataBlockToUdfDataBlock(&call->block, &input); + SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), + .bufLen= udf->bufSize, + .numOfResult = 0}; + code = udf->aggProcFunc(&input, &call->interBuf, &outBuf); + subRsp->resultBuf = outBuf; + + break; + } + case TSDB_UDF_CALL_AGG_FIN: { + SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), + .bufLen= udf->bufSize, + .numOfResult = 0}; + code = udf->aggFinishFunc(&call->interBuf, &outBuf); + subRsp->resultBuf = outBuf; + break; + } + default: + break; + } + + rsp->seqNum = request->seqNum; + rsp->type = request->type; + rsp->code = code; + subRsp->callType = call->callType; + + int32_t len = encodeUdfResponse(NULL, rsp); + rsp->msgLen = len; + void *bufBegin = taosMemoryMalloc(len); + void *buf = bufBegin; + encodeUdfResponse(&buf, rsp); + uvUdf->output = uv_buf_init(bufBegin, len); + + taosMemoryFree(uvUdf->input.base); + return; +} + +void udfdProcessTeardownRequest(SUvUdfWork* uvUdf, SUdfRequest* request) { + SUdfTeardownRequest *teardown = &request->teardown; + fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request->seqNum, teardown->udfHandle); + SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle); + SUdf *udf = handle->udf; + bool unloadUdf = false; + int32_t code = TSDB_CODE_SUCCESS; + + uv_mutex_lock(&global.udfsMutex); + udf->refCount--; + if (udf->refCount == 0) { + unloadUdf = true; + taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); + } + uv_mutex_unlock(&global.udfsMutex); + if (unloadUdf) { + uv_cond_destroy(&udf->condReady); + uv_mutex_destroy(&udf->lock); + if (udf->destroyFunc) { + (udf->destroyFunc)(); + } + uv_dlclose(&udf->lib); + taosMemoryFree(udf); + } + taosMemoryFree(handle); + + SUdfResponse response; + SUdfResponse *rsp = &response; + rsp->seqNum = request->seqNum; + rsp->type = request->type; + rsp->code = code; + int32_t len = encodeUdfResponse(NULL, rsp); + rsp->msgLen = len; + void *bufBegin = taosMemoryMalloc(len); + void *buf = bufBegin; + encodeUdfResponse(&buf, rsp); + uvUdf->output = uv_buf_init(bufBegin, len); + + taosMemoryFree(uvUdf->input.base); + return; +} + void udfdProcessRequest(uv_work_t *req) { SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); SUdfRequest request = {0}; @@ -147,172 +323,16 @@ void udfdProcessRequest(uv_work_t *req) { switch (request.type) { case UDF_TASK_SETUP: { - // TODO: tracable id from client. connect, setup, call, teardown - fnInfo("%" PRId64 " setup request. udf name: %s", request.seqNum, request.setup.udfName); - SUdfSetupRequest *setup = &request.setup; - - SUdf *udf = NULL; - uv_mutex_lock(&global.udfsMutex); - SUdf **udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, strlen(request.setup.udfName)); - if (udfInHash) { - ++(*udfInHash)->refCount; - udf = *udfInHash; - uv_mutex_unlock(&global.udfsMutex); - } else { - SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); - udfNew->refCount = 1; - udfNew->state = UDF_STATE_INIT; - - uv_mutex_init(&udfNew->lock); - uv_cond_init(&udfNew->condReady); - udf = udfNew; - taosHashPut(global.udfsHash, request.setup.udfName, strlen(request.setup.udfName), &udfNew, sizeof(&udfNew)); - uv_mutex_unlock(&global.udfsMutex); - } - - uv_mutex_lock(&udf->lock); - if (udf->state == UDF_STATE_INIT) { - udf->state = UDF_STATE_LOADING; - udfdLoadUdf(setup->udfName, udf); - if (udf->initFunc) { - udf->initFunc(); - } - udf->state = UDF_STATE_READY; - uv_cond_broadcast(&udf->condReady); - uv_mutex_unlock(&udf->lock); - } else { - while (udf->state != UDF_STATE_READY) { - uv_cond_wait(&udf->condReady, &udf->lock); - } - uv_mutex_unlock(&udf->lock); - } - SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle)); - handle->udf = udf; - SUdfResponse rsp; - rsp.seqNum = request.seqNum; - rsp.type = request.type; - rsp.code = 0; - rsp.setupRsp.udfHandle = (int64_t)(handle); - rsp.setupRsp.outputType = udf->outputType; - rsp.setupRsp.outputLen = udf->outputLen; - rsp.setupRsp.bufSize = udf->bufSize; - int32_t len = encodeUdfResponse(NULL, &rsp); - rsp.msgLen = len; - void *bufBegin = taosMemoryMalloc(len); - void *buf = bufBegin; - encodeUdfResponse(&buf, &rsp); - - uvUdf->output = uv_buf_init(bufBegin, len); - - taosMemoryFree(uvUdf->input.base); + udfdProcessSetupRequest(uvUdf, &request); break; } case UDF_TASK_CALL: { - SUdfCallRequest *call = &request.call; - fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request.seqNum, call->callType, - call->udfHandle); - SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); - SUdf *udf = handle->udf; - SUdfResponse response = {0}; - SUdfResponse *rsp = &response; - SUdfCallResponse *subRsp = &rsp->callRsp; - - switch(call->callType) { - case TSDB_UDF_CALL_SCALA_PROC: { - SUdfColumn output = {0}; - - SUdfDataBlock input = {0}; - convertDataBlockToUdfDataBlock(&call->block, &input); - udf->scalarProcFunc(&input, &output); - - convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); - freeUdfColumn(&output); - break; - } - case TSDB_UDF_CALL_AGG_INIT: { - SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), - .bufLen= udf->bufSize, - .numOfResult = 0}; - udf->aggStartFunc(&outBuf); - subRsp->resultBuf = outBuf; - break; - } - case TSDB_UDF_CALL_AGG_PROC: { - SUdfDataBlock input = {0}; - convertDataBlockToUdfDataBlock(&call->block, &input); - SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), - .bufLen= udf->bufSize, - .numOfResult = 0}; - udf->aggProcFunc(&input, &call->interBuf, &outBuf); - subRsp->resultBuf = outBuf; - - break; - } - case TSDB_UDF_CALL_AGG_FIN: { - SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), - .bufLen= udf->bufSize, - .numOfResult = 0}; - udf->aggFinishFunc(&call->interBuf, &outBuf); - subRsp->resultBuf = outBuf; - break; - } - default: - break; - } - - rsp->seqNum = request.seqNum; - rsp->type = request.type; - rsp->code = 0; - subRsp->callType = call->callType; - - int32_t len = encodeUdfResponse(NULL, rsp); - rsp->msgLen = len; - void *bufBegin = taosMemoryMalloc(len); - void *buf = bufBegin; - encodeUdfResponse(&buf, rsp); - uvUdf->output = uv_buf_init(bufBegin, len); - - taosMemoryFree(uvUdf->input.base); + udfdProcessCallRequest(uvUdf, &request); break; } case UDF_TASK_TEARDOWN: { - SUdfTeardownRequest *teardown = &request.teardown; - fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request.seqNum, teardown->udfHandle) SUdfcFuncHandle *handle = - (SUdfcFuncHandle *)(teardown->udfHandle); - SUdf *udf = handle->udf; - bool unloadUdf = false; - uv_mutex_lock(&global.udfsMutex); - udf->refCount--; - if (udf->refCount == 0) { - unloadUdf = true; - taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); - } - uv_mutex_unlock(&global.udfsMutex); - if (unloadUdf) { - uv_cond_destroy(&udf->condReady); - uv_mutex_destroy(&udf->lock); - if (udf->destroyFunc) { - (udf->destroyFunc)(); - } - uv_dlclose(&udf->lib); - taosMemoryFree(udf); - } - taosMemoryFree(handle); - - SUdfResponse response; - SUdfResponse *rsp = &response; - rsp->seqNum = request.seqNum; - rsp->type = request.type; - rsp->code = 0; - int32_t len = encodeUdfResponse(NULL, rsp); - rsp->msgLen = len; - void *bufBegin = taosMemoryMalloc(len); - void *buf = bufBegin; - encodeUdfResponse(&buf, rsp); - uvUdf->output = uv_buf_init(bufBegin, len); - - taosMemoryFree(uvUdf->input.base); + udfdProcessTeardownRequest(uvUdf, &request); break; } default: { diff --git a/source/libs/function/test/udf2.c b/source/libs/function/test/udf2.c index be485bc905..b3b60f93a4 100644 --- a/source/libs/function/test/udf2.c +++ b/source/libs/function/test/udf2.c @@ -27,6 +27,12 @@ int32_t udf2_start(SUdfInterBuf *buf) { int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { int64_t sumSquares = *(int64_t*)interBuf->buf; int8_t numOutput = 0; + for (int32_t i = 0; i < block->numOfCols; ++i) { + SUdfColumn* col = block->udfCols[i]; + if (col->colMeta.type != TSDB_DATA_TYPE_INT) { + return TSDB_CODE_UDF_INVALID_INPUT; + } + } for (int32_t i = 0; i < block->numOfCols; ++i) { for (int32_t j = 0; j < block->numOfRows; ++j) { SUdfColumn* col = block->udfCols[i]; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 9676e4e1f9..a1bc37e6cd 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -461,6 +461,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_CONNECT_ERR, "udf pipe connect erro TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_NO_PIPE, "udf no pipe") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_LOAD_UDF_FAILURE, "udf load failure") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_STATE, "udf invalid state") +TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_INPUT, "udf invalid function input") //schemaless TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type") From 2d89555b724646ef4a944a6bc1a5b3e65fa13a84 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 12 May 2022 17:38:16 +0800 Subject: [PATCH 5/5] fix(rpc): avoid fd leak --- source/libs/qcom/src/queryUtil.c | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 3e3e393f5f..4b4c079649 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -153,11 +153,6 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra .handle = pInfo->msgInfo.handle, .persistHandle = persistHandle, .code = 0}; - if (pInfo->msgType == TDMT_VND_QUERY || pInfo->msgType == TDMT_VND_FETCH || - pInfo->msgType == TDMT_VND_QUERY_CONTINUE) { - rpcMsg.persistHandle = 1; - } - assert(pInfo->fp != NULL); rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx); @@ -168,7 +163,7 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL); } -char *jobTaskStatusStr(int32_t status) { +char* jobTaskStatusStr(int32_t status) { switch (status) { case JOB_TASK_STATUS_NULL: return "NULL"; @@ -197,13 +192,10 @@ char *jobTaskStatusStr(int32_t status) { SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) { SSchema s = {0}; - s.type = type; + s.type = type; s.bytes = bytes; s.colId = colId; tstrncpy(s.name, name, tListLen(s.name)); return s; } - - -