From ef05b1bc956079ccd824640d566f4bf2134f116d Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 21 Apr 2022 17:56:32 +0800 Subject: [PATCH 1/3] fix: memory leak --- source/client/inc/clientInt.h | 1 + source/client/src/clientEnv.c | 51 ++++++++++++++-------------------- source/client/src/clientMain.c | 10 +++++++ source/os/src/osFile.c | 2 +- source/util/src/tlog.c | 2 +- 5 files changed, 34 insertions(+), 32 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 9363cf00a2..0082b5aeea 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -218,6 +218,7 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo); void setResPrecision(SReqResultInfo* pResInfo, int32_t precision); int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4); void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); +void doFreeReqResultInfo(SReqResultInfo* pResInfo); static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) { SMqRspObj* msg = (SMqRspObj*)res; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index ad736fb195..eaf5894d89 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -30,15 +30,15 @@ #define TSC_VAR_RELEASED 0 SAppInfo appInfo; -int32_t clientReqRefPool = -1; +int32_t clientReqRefPool = -1; int32_t clientConnRefPool = -1; -static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; -volatile int32_t tscInitRes = 0; +static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; +volatile int32_t tscInitRes = 0; static void registerRequest(SRequestObj *pRequest) { STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id); - + assert(pTscObj != NULL); // connection has been released already, abort creating request. @@ -49,8 +49,8 @@ static void registerRequest(SRequestObj *pRequest) { if (pTscObj->pAppInfo) { SInstanceSummary *pSummary = &pTscObj->pAppInfo->summary; - int32_t total = atomic_add_fetch_64((int64_t*)&pSummary->totalRequests, 1); - int32_t currentInst = atomic_add_fetch_64((int64_t*)&pSummary->currentRequests, 1); + int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1); + int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1); tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64, pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); @@ -60,16 +60,16 @@ static void registerRequest(SRequestObj *pRequest) { static void deregisterRequest(SRequestObj *pRequest) { assert(pRequest != NULL); - STscObj * pTscObj = pRequest->pTscObj; + STscObj *pTscObj = pRequest->pTscObj; SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary; - int32_t currentInst = atomic_sub_fetch_64((int64_t*)&pActivity->currentRequests, 1); + int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); int64_t duration = taosGetTimestampUs() - pRequest->metric.start; tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64 " ms, current:%d, app current:%d", - pRequest->self, pTscObj->id, pRequest->requestId, duration/1000, num, currentInst); + pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst); releaseTscObj(pTscObj->id); } @@ -109,12 +109,12 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { } void closeAllRequests(SHashObj *pRequests) { - void *pIter = taosHashIterate(pRequests, NULL); + void *pIter = taosHashIterate(pRequests, NULL); while (pIter != NULL) { int64_t *rid = pIter; - releaseRequest(*rid); - + releaseRequest(*rid); + pIter = taosHashIterate(pRequests, pIter); } } @@ -144,7 +144,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; } - + pObj->pAppInfo = pAppInfo; tstrncpy(pObj->user, user, sizeof(pObj->user)); memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN); @@ -160,13 +160,9 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI return pObj; } -STscObj *acquireTscObj(int64_t rid) { - return (STscObj *)taosAcquireRef(clientConnRefPool, rid); -} +STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); } -int32_t releaseTscObj(int64_t rid) { - return taosReleaseRef(clientConnRefPool, rid); -} +int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); } void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t type) { assert(pObj != NULL); @@ -189,11 +185,11 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty tsem_init(&pRequest->body.rspSem, 0, 0); registerRequest(pRequest); - + return pRequest; } -static void doFreeReqResultInfo(SReqResultInfo *pResInfo) { +void doFreeReqResultInfo(SReqResultInfo *pResInfo) { taosMemoryFreeClear(pResInfo->pRspMsg); taosMemoryFreeClear(pResInfo->length); taosMemoryFreeClear(pResInfo->row); @@ -216,7 +212,7 @@ static void doDestroyRequest(void *p) { assert(RID_VALID(pRequest->self)); taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); - + taosMemoryFreeClear(pRequest->msgBuf); taosMemoryFreeClear(pRequest->sqlstr); taosMemoryFreeClear(pRequest->pDb); @@ -243,14 +239,9 @@ void destroyRequest(SRequestObj *pRequest) { taosRemoveRef(clientReqRefPool, pRequest->self); } -SRequestObj *acquireRequest(int64_t rid) { - return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); -} - -int32_t releaseRequest(int64_t rid) { - return taosReleaseRef(clientReqRefPool, rid); -} +SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); } +int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); } void taos_init_imp(void) { // In the APIs of other program language, taos_cleanup is not available yet. @@ -379,7 +370,7 @@ uint64_t generateRequestId() { } uint64_t id = 0; - + while (true) { int64_t ts = taosGetTimestampMs(); uint64_t pid = taosGetPId(); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 6472dea556..014c5c793a 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -131,6 +131,16 @@ void taos_free_result(TAOS_RES *res) { if (TD_RES_QUERY(res)) { SRequestObj *pRequest = (SRequestObj *)res; destroyRequest(pRequest); + } else if (TD_RES_TMQ(res)) { + SMqRspObj *pRsp = (SMqRspObj *)res; + if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); + if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); + if (pRsp->rsp.blockSchema) taosArrayDestroy(pRsp->rsp.blockSchema); + if (pRsp->rsp.blockTbName) taosArrayDestroy(pRsp->rsp.blockTbName); + if (pRsp->rsp.blockTags) taosArrayDestroy(pRsp->rsp.blockTags); + if (pRsp->rsp.blockTagSchema) taosArrayDestroy(pRsp->rsp.blockTagSchema); + pRsp->resInfo.pRspMsg = NULL; + doFreeReqResultInfo(&pRsp->resInfo); } } diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 3d3d13fcb6..4f2f99dec7 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -380,7 +380,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) { #if FILE_WITH_LOCK taosThreadRwlockWrlock(&(pFile->rwlock)); #endif - /*assert(pFile->fd >= 0); // Please check if you have closed the file.*/ + assert(pFile->fd >= 0); // Please check if you have closed the file. int64_t nleft = count; int64_t nwritten = 0; diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 3dce260b10..822e2fbe61 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -221,7 +221,7 @@ static void *taosThreadToOpenNewFile(void *param) { tsLogObj.logHandle->pFile = pFile; tsLogObj.lines = 0; tsLogObj.openInProgress = 0; - taosSsleep(3); + taosSsleep(10); taosCloseLogByFd(pOldFile); uInfo(" new log file:%d is opened", tsLogObj.flag); From f06407b9d9a2b3e64a98569c78f67da14ebd263f Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 21 Apr 2022 19:55:45 +0800 Subject: [PATCH 2/3] [test: modify consume tool] --- tests/test/c/tmqSim.c | 304 ++++++++++++++++++++++++------------------ 1 file changed, 177 insertions(+), 127 deletions(-) diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index d3bed600dd..af7944eb27 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -31,46 +31,49 @@ #define NC "\033[0m" #define min(a, b) (((a) < (b)) ? (a) : (b)) -#define MAX_SQL_STR_LEN (1024 * 1024) -#define MAX_ROW_STR_LEN (16 * 1024) -#define MAX_CONSUMER_THREAD_CNT (16) +#define MAX_SQL_STR_LEN (1024 * 1024) +#define MAX_ROW_STR_LEN (16 * 1024) +#define MAX_CONSUMER_THREAD_CNT (16) typedef struct { - TdThread thread; - int32_t consumerId; + TdThread thread; + int32_t consumerId; - int32_t ifCheckData; - int64_t expectMsgCnt; + int32_t ifCheckData; + int64_t expectMsgCnt; + + int64_t consumeMsgCnt; + int64_t consumeRowCnt; + int32_t checkresult; - int64_t consumeMsgCnt; - int32_t checkresult; + char topicString[1024]; + char keyString[1024]; - char topicString[1024]; - char keyString[1024]; + int32_t numOfTopic; + char topics[32][64]; - int32_t numOfTopic; - char topics[32][64]; - - int32_t numOfKey; - char key[32][64]; - char value[32][64]; + int32_t numOfKey; + char key[32][64]; + char value[32][64]; tmq_t* tmq; tmq_list_t* topicList; - + } SThreadInfo; typedef struct { // input from argvs - char dbName[32]; - int32_t showMsgFlag; - int32_t consumeDelay; // unit s - int32_t numOfThread; - SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT]; + char cdbName[32]; + char dbName[32]; + int32_t showMsgFlag; + int32_t showRowFlag; + int32_t consumeDelay; // unit s + int32_t numOfThread; + SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT]; } SConfInfo; static SConfInfo g_stConfInfo; -TdFilePtr g_fp = NULL; +TdFilePtr g_fp = NULL; // char* g_pRowValue = NULL; // TdFilePtr g_fp = NULL; @@ -85,51 +88,62 @@ static void printHelp() { printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default "); printf("%s%s\n", indent, "-g"); printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag); + printf("%s%s\n", indent, "-r"); + printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag); printf("%s%s\n", indent, "-y"); printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay); exit(EXIT_SUCCESS); } + void initLogFile() { // FILE *fp = fopen(g_stConfInfo.resultFileName, "a"); - TdFilePtr pFile = taosOpenFile("./tmqlog.txt", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM); + char file[256]; + sprintf(file, "%s/../log/tmqlog.txt", configDir); + TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM); if (NULL == pFile) { fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt"); - exit - 1; + exit -1; }; g_fp = pFile; +} + +void saveConfigToLogFile() { time_t tTime = taosGetTimestampSec(); struct tm tm = *taosLocalTime(&tTime, NULL); - taosFprintfFile(pFile, "###################################################################\n"); - taosFprintfFile(pFile, "# configDir: %s\n", configDir); - taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName); - taosFprintfFile(pFile, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag); - taosFprintfFile(pFile, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay); + taosFprintfFile(g_fp, "###################################################################\n"); + taosFprintfFile(g_fp, "# configDir: %s\n", configDir); + taosFprintfFile(g_fp, "# dbName: %s\n", g_stConfInfo.dbName); + taosFprintfFile(g_fp, "# cdbName: %s\n", g_stConfInfo.cdbName); + taosFprintfFile(g_fp, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag); + taosFprintfFile(g_fp, "# showRowFlag: %d\n", g_stConfInfo.showRowFlag); + taosFprintfFile(g_fp, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay); - for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { - taosFprintfFile(pFile, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); - taosFprintfFile(pFile, " Topics: "); - for (int i = 0; i < g_stConfInfo.stThreads[i].numOfTopic; i++) { - taosFprintfFile(pFile, "%s, ", g_stConfInfo.stThreads[i].topics[i]); + for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { + taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); + taosFprintfFile(g_fp, " Topics: "); + for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfTopic; i++) { + taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[i]); } - taosFprintfFile(pFile, "\n"); - taosFprintfFile(pFile, " Key: "); - for (int i = 0; i < g_stConfInfo.stThreads[i].numOfKey; i++) { - taosFprintfFile(pFile, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]); + taosFprintfFile(g_fp, "\n"); + taosFprintfFile(g_fp, " Key: "); + for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfKey; i++) { + taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]); } - taosFprintfFile(pFile, "\n"); + taosFprintfFile(g_fp, "\n"); } - - taosFprintfFile(pFile, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1, - tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); - taosFprintfFile(pFile, "###################################################################\n"); + + taosFprintfFile(g_fp, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1, + tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); + taosFprintfFile(g_fp, "###################################################################\n"); } void parseArgument(int32_t argc, char* argv[]) { memset(&g_stConfInfo, 0, sizeof(SConfInfo)); g_stConfInfo.showMsgFlag = 0; + g_stConfInfo.showRowFlag = 0; g_stConfInfo.consumeDelay = 5; for (int32_t i = 1; i < argc; i++) { @@ -138,10 +152,14 @@ void parseArgument(int32_t argc, char* argv[]) { exit(0); } else if (strcmp(argv[i], "-d") == 0) { strcpy(g_stConfInfo.dbName, argv[++i]); + } else if (strcmp(argv[i], "-w") == 0) { + strcpy(g_stConfInfo.cdbName, argv[++i]); } else if (strcmp(argv[i], "-c") == 0) { strcpy(configDir, argv[++i]); } else if (strcmp(argv[i], "-g") == 0) { g_stConfInfo.showMsgFlag = atol(argv[++i]); + } else if (strcmp(argv[i], "-r") == 0) { + g_stConfInfo.showRowFlag = atol(argv[++i]); } else if (strcmp(argv[i], "-y") == 0) { g_stConfInfo.consumeDelay = atol(argv[++i]); } else { @@ -150,11 +168,17 @@ void parseArgument(int32_t argc, char* argv[]) { } } + initLogFile(); + + taosFprintfFile(g_fp, "====parseArgument() success\n"); + #if 1 pPrint("%s configDir:%s %s", GREEN, configDir, NC); pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC); + pPrint("%s cdbName:%s %s", GREEN, g_stConfInfo.cdbName, NC); pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC); pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC); + pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC); #endif } @@ -180,24 +204,29 @@ void ltrim(char* str) { // return str; } -static int running = 1; -static void msg_process(TAOS_RES* msg, int32_t msgIndex, int32_t threadLable) { +static int running = 1; +static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) { char buf[1024]; + int32_t totalRows = 0; - // printf("topic: %s\n", tmq_get_topic_name(msg)); - // printf("vg:%d\n", tmq_get_vgroup_id(msg)); - taosFprintfFile(g_fp, "msg index:%d, threadLable: %d\n", msgIndex, threadLable); - taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg)); - + //printf("topic: %s\n", tmq_get_topic_name(msg)); + //printf("vg:%d\n", tmq_get_vgroup_id(msg)); + taosFprintfFile(g_fp, "msg index:%" PRId64 ", threadLable: %d\n", msgIndex, threadLable); + taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg)); + while (1) { TAOS_ROW row = taos_fetch_row(msg); if (row == NULL) break; - TAOS_FIELD* fields = taos_fetch_fields(msg); - int32_t numOfFields = taos_field_count(msg); - // taos_print_row(buf, row, fields, numOfFields); - // printf("%s\n", buf); - // taosFprintfFile(g_fp, "%s\n", buf); + if (0 != g_stConfInfo.showRowFlag) { + TAOS_FIELD* fields = taos_fetch_fields(msg); + int32_t numOfFields = taos_field_count(msg); + taos_print_row(buf, row, fields, numOfFields); + taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf); + } + totalRows++; } + + return totalRows; } int queryDB(TAOS* taos, char* command) { @@ -213,31 +242,43 @@ int queryDB(TAOS* taos, char* command) { return 0; } -void build_consumer(SThreadInfo* pInfo) { - char sqlStr[1024] = {0}; - - TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - sprintf(sqlStr, "use %s", g_stConfInfo.dbName); - TAOS_RES* pRes = taos_query(pConn, sqlStr); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - exit(-1); - } - taos_free_result(pRes); +static void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets) { + printf("tmq_commit_cb_print() commit %d\n", resp); +} +void build_consumer(SThreadInfo *pInfo) { tmq_conf_t* conf = tmq_conf_new(); - // tmq_conf_set(conf, "group.id", "tg2"); + + //tmq_conf_set(conf, "td.connect.ip", "localhost"); + //tmq_conf_set(conf, "td.connect.port", "6030"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + + tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); + + tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print); + + // tmq_conf_set(conf, "group.id", "cgrp1"); for (int32_t i = 0; i < pInfo->numOfKey; i++) { tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]); } + + //tmq_conf_set(conf, "client.id", "c-001"); + + //tmq_conf_set(conf, "enable.auto.commit", "true"); + //tmq_conf_set(conf, "enable.auto.commit", "false"); + + //tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); + + //tmq_conf_set(conf, "auto.offset.reset", "none"); + //tmq_conf_set(conf, "auto.offset.reset", "earliest"); + //tmq_conf_set(conf, "auto.offset.reset", "latest"); + pInfo->tmq = tmq_consumer_new(conf, NULL, 0); return; } -void build_topic_list(SThreadInfo* pInfo) { +void build_topic_list(SThreadInfo *pInfo) { pInfo->topicList = tmq_list_new(); // tmq_list_append(topic_list, "test_stb_topic_1"); for (int32_t i = 0; i < pInfo->numOfTopic; i++) { @@ -246,45 +287,49 @@ void build_topic_list(SThreadInfo* pInfo) { return; } -int32_t saveConsumeResult(SThreadInfo* pInfo) { +int32_t saveConsumeResult(SThreadInfo *pInfo) { char sqlStr[1024] = {0}; - + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); assert(pConn != NULL); - + // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int - sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %d)", g_stConfInfo.dbName, - pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->checkresult); - + sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", + g_stConfInfo.cdbName, + pInfo->consumerId, + pInfo->consumeMsgCnt, + pInfo->consumeRowCnt, + pInfo->checkresult); + TAOS_RES* pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); exit(-1); } - + taos_free_result(pRes); return 0; } -void loop_consume(SThreadInfo* pInfo) { +void loop_consume(SThreadInfo *pInfo) { tmq_resp_err_t err; - + int64_t totalMsgs = 0; - // int64_t totalRows = 0; + int64_t totalRows = 0; while (running) { TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000); - if (tmqMsg) { + if (tmqMsg) { if (0 != g_stConfInfo.showMsgFlag) { - msg_process(tmqMsg, totalMsgs, 0); + totalRows += msg_process(tmqMsg, totalMsgs, pInfo->consumerId); } taos_free_result(tmqMsg); totalMsgs++; - + if (totalMsgs >= pInfo->expectMsgCnt) { break; } @@ -292,7 +337,7 @@ void loop_consume(SThreadInfo* pInfo) { break; } } - + err = tmq_consumer_close(pInfo->tmq); if (err) { printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); @@ -300,34 +345,38 @@ void loop_consume(SThreadInfo* pInfo) { } pInfo->consumeMsgCnt = totalMsgs; + pInfo->consumeRowCnt = totalRows; + + taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %"PRId64", consumeRowCnt: %"PRId64"\n", pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt); + } -void* consumeThreadFunc(void* param) { +void *consumeThreadFunc(void *param) { int32_t totalMsgs = 0; - SThreadInfo* pInfo = (SThreadInfo*)param; + SThreadInfo *pInfo = (SThreadInfo *)param; build_consumer(pInfo); build_topic_list(pInfo); - if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) { + if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)){ return NULL; } - + tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList); if (err) { printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); exit(-1); } - + loop_consume(pInfo); err = tmq_unsubscribe(pInfo->tmq); if (err) { printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); - pInfo->consumeMsgCnt = -1; + pInfo->consumeMsgCnt = -1; return NULL; - } - + } + // save consume result into consumeresult table saveConsumeResult(pInfo); @@ -339,7 +388,7 @@ void parseConsumeInfo() { const char delim[2] = ","; const char ch = ':'; - for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { + for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { token = strtok(g_stConfInfo.stThreads[i].topicString, delim); while (token != NULL) { // printf("%s\n", token ); @@ -347,10 +396,10 @@ void parseConsumeInfo() { ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]); // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); g_stConfInfo.stThreads[i].numOfTopic++; - + token = strtok(NULL, delim); } - + token = strtok(g_stConfInfo.stThreads[i].keyString, delim); while (token != NULL) { // printf("%s\n", token ); @@ -364,7 +413,7 @@ void parseConsumeInfo() { // g_stConfInfo.value[g_stConfInfo.numOfKey]); g_stConfInfo.stThreads[i].numOfKey++; } - + token = strtok(NULL, delim); } } @@ -372,47 +421,48 @@ void parseConsumeInfo() { int32_t getConsumeInfo() { char sqlStr[1024] = {0}; - + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); assert(pConn != NULL); - - sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.dbName); + + sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName); TAOS_RES* pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes)); + taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes)); + taosCloseFile(&g_fp); taos_free_result(pRes); exit(-1); - } - - TAOS_ROW row = NULL; - int num_fields = taos_num_fields(pRes); - TAOS_FIELD* fields = taos_fetch_fields(pRes); - - // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, - // ifcheckdata int - + } + + TAOS_ROW row = NULL; + int num_fields = taos_num_fields(pRes); + TAOS_FIELD* fields = taos_fetch_fields(pRes); + + // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int + int32_t numOfThread = 0; while ((row = taos_fetch_row(pRes))) { - int32_t* lengths = taos_fetch_lengths(pRes); - - for (int i = 0; i < num_fields; ++i) { + int32_t* lengths = taos_fetch_lengths(pRes); + + for (int i = 0; i < num_fields; ++i) { if (row[i] == NULL || 0 == i) { continue; } - + if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { - g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]); + g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t *)row[i]); } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]); } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]); } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) { - g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]); + g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t *)row[i]); } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { - g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]); + g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t *)row[i]); } } - numOfThread++; + numOfThread ++; } g_stConfInfo.numOfThread = numOfThread; @@ -423,10 +473,11 @@ int32_t getConsumeInfo() { return 0; } + int main(int32_t argc, char* argv[]) { parseArgument(argc, argv); getConsumeInfo(); - initLogFile(); + saveConfigToLogFile(); TdThreadAttr thattr; taosThreadAttrInit(&thattr); @@ -434,19 +485,18 @@ int main(int32_t argc, char* argv[]) { // pthread_create one thread to consume for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) { - taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, - (void*)(&(g_stConfInfo.stThreads[i]))); + taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, (void *)(&(g_stConfInfo.stThreads[i]))); } for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); } - // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); - - taosFprintfFile(g_fp, "\n"); - taosCloseFile(&g_fp); - + //printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); + + taosFprintfFile(g_fp, "\n"); + taosCloseFile(&g_fp); + return 0; } From 9f8a447d82eba4f202d6fa3c1a2b86454cba8aad Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 21 Apr 2022 19:56:20 +0800 Subject: [PATCH 3/3] [test: modify tmq test cases] --- tests/script/tsim/tmq/basic1.sim | 371 ++++++++++++---------- tests/script/tsim/tmq/clearConsume.sim | 28 ++ tests/script/tsim/tmq/consume.sh | 22 +- tests/script/tsim/tmq/prepareBasicEnv.sim | 88 +++++ 4 files changed, 343 insertions(+), 166 deletions(-) create mode 100644 tests/script/tsim/tmq/clearConsume.sim create mode 100644 tests/script/tsim/tmq/prepareBasicEnv.sim diff --git a/tests/script/tsim/tmq/basic1.sim b/tests/script/tsim/tmq/basic1.sim index 60064b174e..df6a553d1a 100644 --- a/tests/script/tsim/tmq/basic1.sim +++ b/tests/script/tsim/tmq/basic1.sim @@ -8,196 +8,245 @@ # # notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). # -system sh/stop_dnodes.sh -system sh/deploy.sh -n dnode1 -i 1 -system sh/exec.sh -n dnode1 -s start +run tsim/tmq/prepareBasicEnv.sim -$loop_cnt = 0 -check_dnode_ready: - $loop_cnt = $loop_cnt + 1 - sleep 200 - if $loop_cnt == 10 then - print ====> dnode not ready! - return -1 - endi -sql show dnodes -print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05 -if $data00 != 1 then - return -1 -endi -if $data04 != ready then - goto check_dnode_ready -endi +#---- global parameters start ----# +$dbName = db +$vgroups = 1 +$stbPrefix = stb +$ctbPrefix = ctb +$ntbPrefix = ntb +$stbNum = 1 +$ctbNum = 10 +$ntbNum = 10 +$rowsPerCtb = 100 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 +#---- global parameters end ----# + +$pullDelay = 3 +$ifcheckdata = 1 +$showMsg = 1 +$showRow = 0 sql connect +sql use $dbName -$dbNamme = d0 -print =============== create database , vgroup 4 -sql create database $dbNamme vgroups 4 -sql use $dbNamme - -print =============== create super table -sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int) - -sql show stables -if $rows != 1 then - return -1 -endi - -print =============== create child table -sql create table ct0 using stb tags(1000) -sql create table ct1 using stb tags(2000) -#sql create table ct3 using stb tags(3000) - -print =============== create normal table -sql create table ntb (ts timestamp, c1 int, c2 float, c3 binary(10)) - -print =============== create multi topics. notes: now only support: -print =============== 1. columns from stb; 2. * from ctb; 3. columns from ctb -print =============== will support: * from stb; function from stb/ctb - -sql create topic topic_stb_column as select ts, c1, c3 from stb -#sql create topic topic_stb_all as select * from stb +print == create topics from super table +sql create topic topic_stb_column as select ts, c3 from stb +sql create topic topic_stb_all as select ts, c1, c2, c3 from stb sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb -sql create topic topic_ctb_column as select ts, c1, c3 from ct0 -sql create topic topic_ctb_all as select * from ct0 -sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ct0 +print == create topics from child table +sql create topic topic_ctb_column as select ts, c3 from ctb0 +sql create topic topic_ctb_all as select * from ctb0 +sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0 -sql create topic topic_ntb_column as select ts, c1, c3 from ntb -sql create topic topic_ntb_all as select * from ntb -sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb +print == create topics from normal table +sql create topic topic_ntb_column as select ts, c3 from ntb0 +sql create topic topic_ntb_all as select * from ntb0 +sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 -sql show tables -if $rows != 3 then - return -1 -endi - -print =============== insert data - -$tbPrefix = ct -$tbNum = 2 -$rowNum = 10 -$tstart = 1640966400000 # 2022-01-01 00:00:00.000 - -$i = 0 -while $i < $tbNum - $tb = $tbPrefix . $i - - $x = 0 - while $x < $rowNum - $c = $x / 10 - $c = $c * 10 - $c = $x - $c - - $binary = ' . binary - $binary = $binary . $c - $binary = $binary . ' - - sql insert into $tb values ($tstart , $c , $x , $binary ) - sql insert into ntb values ($tstart , $c , $x , $binary ) - $tstart = $tstart + 1 - $x = $x + 1 - endw - - $i = $i + 1 - $tstart = 1640966400000 -endw - -#root@trd02 /home $ tmq_sim --help -# -c Configuration directory, default is -# -d The name of the database for cosumer, no default -# -t The topic string for cosumer, no default -# -k The key-value string for cosumer, no default -# -g showMsgFlag, default is 0 -# - -$totalMsgCnt = $rowNum * $tbNum -print inserted totalMsgCnt: $totalMsgCnt - -# supported key: -# group.id: -# enable.auto.commit: -# auto.offset.reset: -# td.connect.ip: -# td.connect.user:root -# td.connect.pass:taosdata -# td.connect.port:6030 -# td.connect.db:db - -system_content echo -n \$BUILD_DIR -$tmq_sim = $system_content . /build/bin/tmq_sim -system_content echo -n \$SIM_DIR -$tsim_cfg = $system_content . /tsim/cfg - -print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" -system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" -print cmd result----> $system_content -if $system_content != @{consume success: 20, 0}@ then - return -1 -endi - -#print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" -#system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" -#print cmd result----> $system_content -#if $system_content != @{consume success: 20, 0}@ then +#sql show topics +#if $rows != 9 then # return -1 #endi -print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2" -system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2" -print cmd result----> $system_content -if $system_content != @{consume success: 20, 0}@ then - print expect @{consume success: 20, 0}@, actual: $system_content - return -1 +$keyList = ' . group.id:cgrp1 +$keyList = $keyList . ' + +print ================ test consume from stb +$loop_cnt = 0 +loop_consume_diff_topic_from_stb: +if $loop_cnt == 0 then + print == scenario 1: topic_stb_column + $topicList = ' . topic_stb_column + $topicList = $topicList . ' +elif $loop_cnt == 1 then + print == scenario 2: topic_stb_all + $topicList = ' . topic_stb_all + $topicList = $topicList . ' +elif $loop_cnt == 2 then + print == scenario 3: topic_stb_function + $topicList = ' . topic_stb_function + $topicList = $topicList . ' +else + goto loop_consume_diff_topic_from_stb_end endi -print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" -system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" -print cmd result----> $system_content -if $system_content != @{consume success: 10, 0}@ then +$consumerId = 0 +$totalMsgOfStb = $ctbNum * $rowsPerCtb +#$expectmsgcnt = $totalMsgOfStb + 1 +$expectmsgcnt = 110 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from stb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start + +print == check consume result +wait_consumer_end_from_stb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +if $rows != 1 then + sleep 1000 + goto wait_consumer_end_from_stb +endi +if $data[0][1] != $consumerId then return -1 endi - -print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" -system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" -print cmd result----> $system_content -if $system_content != @{consume success: 10, 0}@ then +if $data[0][2] != $expectmsgcnt then return -1 endi - -print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2" -system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2" -print cmd result----> $system_content -if $system_content != @{consume success: 10, 0}@ then +if $data[0][3] != $expectmsgcnt then return -1 endi +$loop_cnt = $loop_cnt + 1 +goto loop_consume_diff_topic_from_stb +loop_consume_diff_topic_from_stb_end: -print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" -system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" -print cmd result----> $system_content -if $system_content != @{consume success: 20, 0}@ then +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdbName = cdb1 +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then return -1 endi +####################################################################################### -print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" -system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" -print cmd result----> $system_content -if $system_content != @{consume success: 20, 0}@ then - return -1 + +print ================ test consume from ctb +$loop_cnt = 0 +loop_consume_diff_topic_from_ctb: +if $loop_cnt == 0 then + print == scenario 1: topic_ctb_column + $topicList = ' . topic_ctb_column + $topicList = $topicList . ' +elif $loop_cnt == 1 then + print == scenario 2: topic_ctb_all + $topicList = ' . topic_ctb_all + $topicList = $topicList . ' +elif $loop_cnt == 2 then + print == scenario 3: topic_ctb_function + $topicList = ' . topic_ctb_function + $topicList = $topicList . ' +else + goto loop_consume_diff_topic_from_ctb_end endi -print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2" -system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2" -print cmd result----> $system_content -if $system_content != @{consume success: 20, 0}@ then +$consumerId = 0 +$totalMsgOfCtb = $rowsPerCtb +$expectmsgcnt = $totalMsgOfCtb + 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from stb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start + +print == check consume result +wait_consumer_end_from_ctb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +if $rows != 1 then + sleep 1000 + goto wait_consumer_end_from_ctb +endi +if $data[0][1] != $consumerId then return -1 endi +if $data[0][2] != $totalMsgOfCtb then + return -1 +endi +if $data[0][3] != $totalMsgOfCtb then + return -1 +endi +$loop_cnt = $loop_cnt + 1 +goto loop_consume_diff_topic_from_ctb +loop_consume_diff_topic_from_ctb_end: -print =============== create database , vgroup 4 -$dbNamme = d1 -sql create database $dbNamme vgroups 4 +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdbName = cdb2 +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName +print == create consume info table and consume result table +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + + +print ================ test consume from ntb +$loop_cnt = 0 +loop_consume_diff_topic_from_ntb: +if $loop_cnt == 0 then + print == scenario 1: topic_ntb_column + $topicList = ' . topic_ntb_column + $topicList = $topicList . ' +elif $loop_cnt == 1 then + print == scenario 2: topic_ntb_all + $topicList = ' . topic_ntb_all + $topicList = $topicList . ' +elif $loop_cnt == 2 then + print == scenario 3: topic_ntb_function + $topicList = ' . topic_ntb_function + $topicList = $topicList . ' +else + goto loop_consume_diff_topic_from_ntb_end +endi + +$consumerId = 0 +$totalMsgOfNtb = $rowsPerCtb +$expectmsgcnt = $totalMsgOfNtb + 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from stb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start + +print == check consume result from ntb +wait_consumer_end_from_ntb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +if $rows != 1 then + sleep 1000 + goto wait_consumer_end_from_ntb +endi +if $data[0][1] != $consumerId then + return -1 +endi +if $data[0][2] != $totalMsgOfNtb then + return -1 +endi +if $data[0][3] != $totalMsgOfNtb then + return -1 +endi +$loop_cnt = $loop_cnt + 1 +goto loop_consume_diff_topic_from_ntb +loop_consume_diff_topic_from_ntb_end: + +#------ not need stop consumer, because it exit after pull msg overthan expect msg +#system tsim/tmq/consume.sh -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/clearConsume.sim b/tests/script/tsim/tmq/clearConsume.sim new file mode 100644 index 0000000000..8b55e0f8c3 --- /dev/null +++ b/tests/script/tsim/tmq/clearConsume.sim @@ -0,0 +1,28 @@ +sql connect + +#---- global parameters start ----# +$dbName = db +$vgroups = 1 +$stbPrefix = stb +$ctbPrefix = ctb +$ntbPrefix = ntb +$stbNum = 1 +$ctbNum = 10 +$ntbNum = 10 +$rowsPerCtb = 100 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 +#---- global parameters end ----# + +sql use $dbName + +print == create consume info table and consume result table +sql drop table consumeinfo +sql drop table consumeresult +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi + diff --git a/tests/script/tsim/tmq/consume.sh b/tests/script/tsim/tmq/consume.sh index ac500e6704..28e03d8fb9 100755 --- a/tests/script/tsim/tmq/consume.sh +++ b/tests/script/tsim/tmq/consume.sh @@ -11,16 +11,25 @@ set +e # set default value for parameters EXEC_OPTON=start DB_NAME=db +CDB_NAME=db POLL_DELAY=5 VALGRIND=0 SIGNAL=SIGINT +SHOW_MSG=0 +SHOW_ROW=0 -while getopts "d:s:v:y:x:" arg +while getopts "d:s:v:y:x:g:r:w:" arg do case $arg in d) DB_NAME=$OPTARG ;; + g) + SHOW_MSG=$OPTARG + ;; + r) + SHOW_ROW=$OPTARG + ;; s) EXEC_OPTON=$OPTARG ;; @@ -33,6 +42,9 @@ do x) SIGNAL=$OPTARG ;; + w) + CDB_NAME=$OPTARG + ;; ?) echo "unkown argument" ;; @@ -80,11 +92,11 @@ echo "DB_NAME: $DB_NAME echo "------------------------------------------------------------------------" if [ "$EXEC_OPTON" = "start" ]; then if [ $VALGRIND -eq 1 ]; then - echo nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 & - nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 & + echo nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW > /dev/null 2>&1 & + nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW > /dev/null 2>&1 & else - echo "nohup $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 &" - nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME > /dev/null 2>&1 & + echo "nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW -w $CDB_NAME > /dev/null 2>&1 &" + nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW -w $CDB_NAME > /dev/null 2>&1 & fi else PID=`ps -ef|grep tmq_sim | grep -v grep | awk '{print $2}'` diff --git a/tests/script/tsim/tmq/prepareBasicEnv.sim b/tests/script/tsim/tmq/prepareBasicEnv.sim new file mode 100644 index 0000000000..066d7d4ab0 --- /dev/null +++ b/tests/script/tsim/tmq/prepareBasicEnv.sim @@ -0,0 +1,88 @@ +# stop all dnodes before start this case +system sh/stop_dnodes.sh + +# deploy dnode 1 +system sh/deploy.sh -n dnode1 -i 1 + +# add some config items for this case +#system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 + +# start dnode 1 +system sh/exec.sh -n dnode1 -s start + +sql connect + +#---- global parameters start ----# +$dbName = db +$vgroups = 1 +$stbPrefix = stb +$ctbPrefix = ctb +$ntbPrefix = ntb +$stbNum = 1 +$ctbNum = 10 +$ntbNum = 10 +$rowsPerCtb = 100 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 +#---- global parameters end ----# + +print == create database $dbName vgroups $vgroups +sql create database $dbName vgroups $vgroups + +#wait database ready +$loop_cnt = 0 +check_db_ready: +if $loop_cnt == 10 then + print ====> database not ready! + return -1 +endi +sql show databases +print ==> rows: $rows +print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12] +print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20] +if $data(db)[20] != nostrict then + sleep 100 + $loop_cnt = $loop_cnt + 1 + goto check_db_ready +endi + +sql use $dbName + +print == create consume info table and consume result table +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi + +print == create super table +sql create table $stbPrefix (ts timestamp, c1 int, c2 float, c3 binary(16)) tags (t1 int) +sql show stables +if $rows != 1 then + return -1 +endi + +print == create child table, normal table and insert data +$i = 0 +while $i < $ctbNum + $ctb = $ctbPrefix . $i + $ntb = $ntbPrefix . $i + sql create table $ctb using $stbPrefix tags( $i ) + sql create table $ntb (ts timestamp, c1 int, c2 float, c3 binary(16)) + + $x = 0 + while $x < $rowsPerCtb + $binary = ' . binary- + $binary = $binary . $i + $binary = $binary . ' + + sql insert into $ctb values ($tstart , $i , $x , $binary ) + sql insert into $ntb values ($tstart , $i , $x , $binary ) + $tstart = $tstart + 1 + $x = $x + 1 + endw + + $i = $i + 1 + $tstart = 1640966400000 +endw