diff --git a/include/common/tmsg.h b/include/common/tmsg.h index dc83015a89..6ca6738778 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3044,7 +3044,8 @@ typedef struct SDeleteRes { int64_t skey; int64_t ekey; int64_t affectedRows; - char tableFName[TSDB_TABLE_FNAME_LEN]; + char tableFName[TSDB_TABLE_NAME_LEN]; + char tsColName[TSDB_COL_NAME_LEN]; } SDeleteRes; int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 90b804b382..47177dc11b 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -38,7 +38,8 @@ typedef struct SDeleterRes { int64_t skey; int64_t ekey; int64_t affectedRows; - char tableFName[TSDB_TABLE_FNAME_LEN]; + char tableName[TSDB_TABLE_NAME_LEN]; + char tsColName[TSDB_COL_NAME_LEN]; } SDeleterRes; typedef struct SDeleterParam { diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index c91978c125..644185a244 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -151,7 +151,8 @@ typedef struct SVnodeModifyLogicNode { uint64_t tableId; uint64_t stableId; int8_t tableType; // table type - char tableFName[TSDB_TABLE_FNAME_LEN]; + char tableName[TSDB_TABLE_NAME_LEN]; + char tsColName[TSDB_COL_NAME_LEN]; STimeWindow deleteTimeRange; SVgroupsInfo* pVgroupList; SNodeList* pInsertCols; @@ -494,7 +495,7 @@ typedef struct SQueryInserterNode { uint64_t tableId; uint64_t stableId; int8_t tableType; // table type - char tableFName[TSDB_TABLE_FNAME_LEN]; + char tableName[TSDB_TABLE_NAME_LEN]; int32_t vgId; SEpSet epSet; } SQueryInserterNode; @@ -503,7 +504,7 @@ typedef struct SDataDeleterNode { SDataSinkNode sink; uint64_t tableId; int8_t tableType; // table type - char tableFName[TSDB_TABLE_FNAME_LEN]; + char tableFName[TSDB_TABLE_NAME_LEN]; char tsColName[TSDB_COL_NAME_LEN]; STimeWindow deleteTimeRange; SNode* pAffectedRows; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index df9072fe1a..88ebb099e5 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -2823,35 +2823,35 @@ end: // delete from db.tabl where .. -> delete from tabl where .. // delete from db .tabl where .. -> delete from tabl where .. -static void getTbName(char *sql){ - char *ch = sql; - - bool inBackQuote = false; - int8_t dotIndex = 0; - while(*ch != '\0'){ - if(!inBackQuote && *ch == '`'){ - inBackQuote = true; - ch++; - continue; - } - - if(inBackQuote && *ch == '`'){ - inBackQuote = false; - ch++; - - continue; - } - - if(!inBackQuote && *ch == '.'){ - dotIndex ++; - if(dotIndex == 2){ - memmove(sql, ch + 1, strlen(ch + 1) + 1); - break; - } - } - ch++; - } -} +//static void getTbName(char *sql){ +// char *ch = sql; +// +// bool inBackQuote = false; +// int8_t dotIndex = 0; +// while(*ch != '\0'){ +// if(!inBackQuote && *ch == '`'){ +// inBackQuote = true; +// ch++; +// continue; +// } +// +// if(inBackQuote && *ch == '`'){ +// inBackQuote = false; +// ch++; +// +// continue; +// } +// +// if(!inBackQuote && *ch == '.'){ +// dotIndex ++; +// if(dotIndex == 2){ +// memmove(sql, ch + 1, strlen(ch + 1) + 1); +// break; +// } +// } +// ch++; +// } +//} static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { SDeleteRes req = {0}; @@ -2867,9 +2867,9 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { goto end; } - getTbName(req.tableFName); +// getTbName(req.tableFName); char sql[256] = {0}; - sprintf(sql, "delete from `%s` where `%s` >= %" PRId64" and `%s` <= %" PRId64, req.tableFName, "ts", req.skey, "ts", req.ekey); + sprintf(sql, "delete from `%s` where `%s` >= %" PRId64" and `%s` <= %" PRId64, req.tableFName, req.tsColName, req.skey, req.tsColName, req.ekey); printf("delete sql:%s\n", sql); TAOS_RES* res = taos_query(taos, sql); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 22b7dd827d..3163982dec 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5682,6 +5682,7 @@ int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) { if (tEncodeI64v(pCoder, pRes->affectedRows) < 0) return -1; if (tEncodeCStr(pCoder, pRes->tableFName) < 0) return -1; + if (tEncodeCStr(pCoder, pRes->tsColName) < 0) return -1; return 0; } @@ -5700,6 +5701,7 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) { if (tDecodeI64v(pCoder, &pRes->affectedRows) < 0) return -1; if (tDecodeCStrTo(pCoder, pRes->tableFName) < 0) return -1; + if (tDecodeCStrTo(pCoder, pRes->tsColName) < 0) return -1; return 0; } int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 391aef529f..06b7c13fa2 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -90,7 +90,8 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp pRes->uidList = pHandle->pParam->pUidList; pRes->skey = pHandle->pDeleter->deleteTimeRange.skey; pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey; - strcpy(pRes->tableFName, pHandle->pDeleter->tableFName); + strcpy(pRes->tableName, pHandle->pDeleter->tableFName); + strcpy(pRes->tsColName, pHandle->pDeleter->tsColName); pRes->affectedRows = *(int64_t*)pColRes->pData; pBuf->useSize += pEntry->dataLen; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 5279d015b4..5fc94c2642 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -401,7 +401,8 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi COPY_SCALAR_FIELD(tableId); COPY_SCALAR_FIELD(stableId); COPY_SCALAR_FIELD(tableType); - COPY_CHAR_ARRAY_FIELD(tableFName); + COPY_CHAR_ARRAY_FIELD(tableName); + COPY_CHAR_ARRAY_FIELD(tsColName); COPY_OBJECT_FIELD(deleteTimeRange, sizeof(STimeWindow)); CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone); CLONE_NODE_LIST_FIELD(pInsertCols); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index c499d6e7cc..1c9c097cba 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2332,7 +2332,7 @@ static int32_t physiQueryInsertNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanTableType, pNode->tableType); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddStringToObject(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableFName); + code = tjsonAddStringToObject(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableName); } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanVgId, pNode->vgId); @@ -2361,7 +2361,7 @@ static int32_t jsonToPhysiQueryInsertNode(const SJson* pJson, void* pObj) { code = tjsonGetTinyIntValue(pJson, jkQueryInsertPhysiPlanTableType, &pNode->tableType); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetStringValue(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableFName); + code = tjsonGetStringValue(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableName); } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetIntValue(pJson, jkQueryInsertPhysiPlanVgId, &pNode->vgId); @@ -2376,6 +2376,7 @@ static int32_t jsonToPhysiQueryInsertNode(const SJson* pJson, void* pObj) { static const char* jkDeletePhysiPlanTableId = "TableId"; static const char* jkDeletePhysiPlanTableType = "TableType"; static const char* jkDeletePhysiPlanTableFName = "TableFName"; +static const char* jkDeletePhysiPlanTsColName = "TsColName"; static const char* jkDeletePhysiPlanDeleteTimeRangeStartKey = "DeleteTimeRangeStartKey"; static const char* jkDeletePhysiPlanDeleteTimeRangeEndKey = "DeleteTimeRangeEndKey"; static const char* jkDeletePhysiPlanAffectedRows = "AffectedRows"; @@ -2393,6 +2394,9 @@ static int32_t physiDeleteNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddStringToObject(pJson, jkDeletePhysiPlanTableFName, pNode->tableFName); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddStringToObject(pJson, jkDeletePhysiPlanTsColName, pNode->tsColName); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkDeletePhysiPlanDeleteTimeRangeStartKey, pNode->deleteTimeRange.skey); } @@ -2419,6 +2423,9 @@ static int32_t jsonToPhysiDeleteNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetStringValue(pJson, jkDeletePhysiPlanTableFName, pNode->tableFName); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetStringValue(pJson, jkDeletePhysiPlanTsColName, pNode->tsColName); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBigIntValue(pJson, jkDeletePhysiPlanDeleteTimeRangeStartKey, &pNode->deleteTimeRange.skey); } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 30e3b676df..d405b75003 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1292,8 +1292,8 @@ static int32_t createVnodeModifLogicNodeByDelete(SLogicPlanContext* pCxt, SDelet pModify->modifyType = MODIFY_TABLE_TYPE_DELETE; pModify->tableId = pRealTable->pMeta->uid; pModify->tableType = pRealTable->pMeta->tableType; - snprintf(pModify->tableFName, sizeof(pModify->tableFName), "%d.%s.%s", pCxt->pPlanCxt->acctId, - pRealTable->table.dbName, pRealTable->table.tableName); + snprintf(pModify->tableName, sizeof(pModify->tableName), "%s", pRealTable->table.tableName); + strcpy(pModify->tsColName, pRealTable->pMeta->schema->name); pModify->deleteTimeRange = pDelete->timeRange; pModify->pAffectedRows = nodesCloneNode(pDelete->pCountFunc); if (NULL == pModify->pAffectedRows) { @@ -1342,8 +1342,7 @@ static int32_t createVnodeModifLogicNodeByInsert(SLogicPlanContext* pCxt, SInser pModify->tableId = pRealTable->pMeta->uid; pModify->stableId = pRealTable->pMeta->suid; pModify->tableType = pRealTable->pMeta->tableType; - snprintf(pModify->tableFName, sizeof(pModify->tableFName), "%d.%s.%s", pCxt->pPlanCxt->acctId, - pRealTable->table.dbName, pRealTable->table.tableName); + snprintf(pModify->tableName, sizeof(pModify->tableName), "%s", pRealTable->table.tableName); TSWAP(pModify->pVgroupList, pRealTable->pVgroupList); pModify->pInsertCols = nodesCloneList(pInsert->pCols); if (NULL == pModify->pInsertCols) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 5dc95a0f34..2e5c4255e6 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1588,7 +1588,7 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod pInserter->tableId = pModify->tableId; pInserter->stableId = pModify->stableId; pInserter->tableType = pModify->tableType; - strcpy(pInserter->tableFName, pModify->tableFName); + strcpy(pInserter->tableName, pModify->tableName); pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId; pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet; vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode); @@ -1638,7 +1638,8 @@ static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pDeleter->tableId = pModify->tableId; pDeleter->tableType = pModify->tableType; - strcpy(pDeleter->tableFName, pModify->tableFName); + strcpy(pDeleter->tableFName, pModify->tableName); + strcpy(pDeleter->tsColName, pModify->tsColName); pDeleter->deleteTimeRange = pModify->deleteTimeRange; int32_t code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pAffectedRows, diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index df57d0fef1..d1f8a50dab 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -283,7 +283,8 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes pRes->skey = pDelRes->skey; pRes->ekey = pDelRes->ekey; pRes->affectedRows = pDelRes->affectedRows; - strcpy(pRes->tableFName, pDelRes->tableFName); + strcpy(pRes->tableFName, pDelRes->tableName); + strcpy(pRes->tsColName, pDelRes->tsColName); taosMemoryFree(output.pData); return TSDB_CODE_SUCCESS; diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py new file mode 100644 index 0000000000..a136d0a1a2 --- /dev/null +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -0,0 +1,91 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def checkFileContent(self): + buildPath = tdCom.getBuildPath() + cfgPath = tdCom.getClientCfgPath() + cmdStr = '%s/build/bin/tmq_taosx_ci -c %s'%(buildPath, cfgPath) + tdLog.info(cmdStr) + os.system(cmdStr) + + srcFile = '%s/../log/tmq_taosx_tmp.source'%(cfgPath) + dstFile = '%s/../log/tmq_taosx_tmp.result'%(cfgPath) + tdLog.info("compare file: %s, %s"%(srcFile, dstFile)) + + consumeFile = open(srcFile, mode='r') + queryFile = open(dstFile, mode='r') + + while True: + dst = queryFile.readline() + src = consumeFile.readline() + + if dst: + if dst != src: + tdLog.exit("compare error: %s != %s"%src, dst) + else: + break + + tdSql.execute('use db_taosx') + tdSql.query("select * from ct3") + tdSql.checkRows(2) + tdSql.checkData(0, 1, 51) + tdSql.checkData(0, 4, 940) + tdSql.checkData(1, 1, 23) + tdSql.checkData(1, 4, None) + + tdSql.query("select * from ct1") + tdSql.checkRows(4) + + tdSql.query("select * from ct2") + tdSql.checkRows(0) + + tdSql.query("select * from ct0") + tdSql.checkRows(2) + tdSql.checkData(0, 3, "a") + tdSql.checkData(1, 4, None) + + tdSql.query("select * from n1") + tdSql.checkRows(2) + tdSql.checkData(0, 1, "eeee") + tdSql.checkData(1, 2, 940) + + tdSql.query("select * from jt") + tdSql.checkRows(2) + tdSql.checkData(0, 1, 11) + tdSql.checkData(0, 2, None) + tdSql.checkData(1, 1, 1) + tdSql.checkData(1, 2, '{"k1":1,"k2":"hello"}') + + return + + def run(self): + tdSql.prepare() + self.checkFileContent() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 8c19a88ac6..8f5333a4ca 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -227,6 +227,7 @@ python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py +python3 ./test.py -f 7-tmq/tmq_taosx.py # python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py #------------querPolicy 2----------- diff --git a/tests/test/c/CMakeLists.txt b/tests/test/c/CMakeLists.txt index 5db97a0f0f..605eef9be3 100644 --- a/tests/test/c/CMakeLists.txt +++ b/tests/test/c/CMakeLists.txt @@ -1,6 +1,7 @@ add_executable(tmq_demo tmqDemo.c) add_executable(tmq_sim tmqSim.c) add_executable(create_table createTable.c) +add_executable(tmq_taosx_ci tmq_taosx_ci.c) target_link_libraries( create_table PUBLIC taos_static @@ -22,6 +23,13 @@ target_link_libraries( PUBLIC common PUBLIC os ) +target_link_libraries( + tmq_taosx_ci + PUBLIC taos_static + PUBLIC util + PUBLIC common + PUBLIC os +) add_executable(sdbDump sdbDump.c) target_link_libraries( diff --git a/tests/test/c/tmq_taosx_ci.c b/tests/test/c/tmq_taosx_ci.c new file mode 100644 index 0000000000..ece7ad4819 --- /dev/null +++ b/tests/test/c/tmq_taosx_ci.c @@ -0,0 +1,520 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "taos.h" +#include "types.h" + +static int running = 1; +TdFilePtr g_fp = NULL; +char dir[64]={0}; + +static TAOS* use_db(){ + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return NULL; + } + + TAOS_RES* pRes = taos_query(pConn, "use db_taosx"); + if (taos_errno(pRes) != 0) { + printf("error in use db_taosx, reason:%s\n", taos_errstr(pRes)); + return NULL; + } + taos_free_result(pRes); + return pConn; +} + +static void msg_process(TAOS_RES* msg) { + /*memset(buf, 0, 1024);*/ + printf("-----------topic-------------: %s\n", tmq_get_topic_name(msg)); + printf("db: %s\n", tmq_get_db_name(msg)); + printf("vg: %d\n", tmq_get_vgroup_id(msg)); + TAOS *pConn = use_db(); + if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) { + char* result = tmq_get_json_meta(msg); + if (result) { + printf("meta result: %s\n", result); + } + taosFprintfFile(g_fp, result); + taosFprintfFile(g_fp, "\n"); + tmq_free_json_meta(result); + } + + tmq_raw_data raw = {0}; + tmq_get_raw(msg, &raw); + int32_t ret = tmq_write_raw(pConn, raw); + printf("write raw data: %s\n", tmq_err2str(ret)); + +// else{ +// while(1){ +// int numOfRows = 0; +// void *pData = NULL; +// taos_fetch_raw_block(msg, &numOfRows, &pData); +// if(numOfRows == 0) break; +// printf("write data: tbname:%s, numOfRows:%d\n", tmq_get_table_name(msg), numOfRows); +// int ret = taos_write_raw_block(pConn, numOfRows, pData, tmq_get_table_name(msg)); +// printf("write raw data: %s\n", tmq_err2str(ret)); +// } +// } + + taos_close(pConn); +} + +int32_t init_env() { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_taosx"); + if (taos_errno(pRes) != 0) { + printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists db_taosx vgroups 1"); + if (taos_errno(pRes) != 0) { + printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists abc1"); + if (taos_errno(pRes) != 0) { + printf("error in drop db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, + "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " + "nchar(8), t4 bool)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct0 values(1626006833600, 1, 2, 'a')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists ct1 using st1(t1) tags(2000)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists ct2 using st1(t1) tags(NULL)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table ct2, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists ct3 using st1(t1) tags(3000)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833602, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table st1 add column c4 bigint"); + if (taos_errno(pRes) != 0) { + printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)"); + if (taos_errno(pRes) != 0) { + printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (1626006833609, 51, 62, 'c333', 940)"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct3 select * from ct1"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)"); + if (taos_errno(pRes) != 0) { + printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table ct3 set tag t1=5000"); + if (taos_errno(pRes) != 0) { + printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "delete from abc1 .ct3 where ts < 1626006833606"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"); + if (taos_errno(pRes) != 0) { + printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table n1 add column c3 bigint"); + if (taos_errno(pRes) != 0) { + printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)"); + if (taos_errno(pRes) != 0) { + printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table n1 rename column c3 cc3"); + if (taos_errno(pRes) != 0) { + printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table n1 comment 'hello'"); + if (taos_errno(pRes) != 0) { + printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table n1 drop column c1"); + if (taos_errno(pRes) != 0) { + printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table jt2 using jt tags('')"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into jt1 values(now, 1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table jt1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into jt2 values(now, 11)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + taos_close(pConn); + return 0; +} + +int32_t create_topic() { + printf("create topic\n"); + TAOS_RES* pRes; + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + taos_close(pConn); + return 0; +} + +void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + printf("commit %d tmq %p param %p\n", code, tmq, param); +} + +tmq_t* build_consumer() { +#if 0 + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); +#endif + + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg2"); + tmq_conf_set(conf, "client.id", "my app 1"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "enable.heartbeat.background", "true"); + + /*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/ + + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + assert(tmq); + tmq_conf_destroy(conf); + return tmq; +} + +tmq_list_t* build_topic_list() { + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "topic_ctb_column"); + /*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/ + return topic_list; +} + +void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { + int32_t code; + + if ((code = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); + printf("subscribe err\n"); + return; + } + int32_t cnt = 0; + while (running) { + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); + if (tmqmessage) { + cnt++; + msg_process(tmqmessage); + /*if (cnt >= 2) break;*/ + /*printf("get data\n");*/ + taos_free_result(tmqmessage); + /*} else {*/ + /*break;*/ + /*tmq_commit_sync(tmq, NULL);*/ + }else{ + break; + } + } + + code = tmq_consumer_close(tmq); + if (code) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); + else + fprintf(stderr, "%% Consumer closed\n"); +} + +void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { + static const int MIN_COMMIT_COUNT = 1; + + int msg_count = 0; + int32_t code; + + if ((code = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); + return; + } + + tmq_list_t* subList = NULL; + tmq_subscription(tmq, &subList); + char** subTopics = tmq_list_to_c_array(subList); + int32_t sz = tmq_list_get_size(subList); + printf("subscribed topics: "); + for (int32_t i = 0; i < sz; i++) { + printf("%s, ", subTopics[i]); + } + printf("\n"); + tmq_list_destroy(subList); + + while (running) { + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); + if (tmqmessage) { + msg_process(tmqmessage); + taos_free_result(tmqmessage); + + /*tmq_commit_sync(tmq, NULL);*/ + /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/ + } + } + + code = tmq_consumer_close(tmq); + if (code) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); + else + fprintf(stderr, "%% Consumer closed\n"); +} + +void initLogFile() { + char f1[256] = {0}; + char f2[256] = {0}; + + sprintf(f1, "%s/../log/tmq_taosx_tmp.source", dir); + sprintf(f2, "%s/../log/tmq_taosx_tmp.result", dir); + TdFilePtr pFile = taosOpenFile(f1, TD_FILE_TEXT | TD_FILE_TRUNC | TD_FILE_STREAM); + if (NULL == pFile) { + fprintf(stderr, "Failed to open %s for save result\n", f1); + exit(-1); + } + g_fp = pFile; + + TdFilePtr pFile2 = taosOpenFile(f2, TD_FILE_TEXT | TD_FILE_TRUNC | TD_FILE_STREAM); + if (NULL == pFile2) { + fprintf(stderr, "Failed to open %s for save result\n", f2); + exit(-1); + } + char *result[] = { + "{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}", + "{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}", + "{\"type\":\"create\",\"tableName\":\"ct1\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}]}", + "{\"type\":\"create\",\"tableName\":\"ct2\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[]}", + "{\"type\":\"create\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":3000}]}", + "{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":5,\"colName\":\"c4\",\"colType\":5}", + "{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":7,\"colName\":\"c3\",\"colType\":8,\"colLength\":64}", + "{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":1,\"colName\":\"t2\",\"colType\":8,\"colLength\":64}", + "{\"type\":\"alter\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"alterType\":4,\"colName\":\"t1\",\"colValue\":\"5000\",\"colValueNull\":false}", + "{\"type\":\"create\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":10,\"length\":4}],\"tags\":[]}", + "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":5,\"colName\":\"c3\",\"colType\":5}", + "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":7,\"colName\":\"c2\",\"colType\":10,\"colLength\":8}", + "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":10,\"colName\":\"c3\",\"colNewName\":\"cc3\"}", + "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":9}", + "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":6,\"colName\":\"c1\"}", + "{\"type\":\"create\",\"tableName\":\"jt\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}", + "{\"type\":\"create\",\"tableName\":\"jt1\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}]}", + "{\"type\":\"create\",\"tableName\":\"jt2\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[]}" + }; + + for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){ + taosFprintfFile(pFile2, result[i]); + taosFprintfFile(pFile2, "\n"); + } + taosCloseFile(&pFile2); +} + +int main(int argc, char* argv[]) { + if(argc == 3 && strcmp(argv[1], "-c") == 0) { + strcpy(dir, argv[2]); + }else{ + strcpy(dir, "../../../sim/psim/cfg"); + } + + printf("env init\n"); + initLogFile(); + + if (init_env() < 0) { + return -1; + } + create_topic(); + + tmq_t* tmq = build_consumer(); + tmq_list_t* topic_list = build_topic_list(); + basic_consume_loop(tmq, topic_list); + /*sync_consume_loop(tmq, topic_list);*/ + taosCloseFile(&g_fp); +}