diff --git a/include/client/taos.h b/include/client/taos.h index 40772e9d2c..626264d94e 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -92,14 +92,16 @@ typedef struct taosField { typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code); -DLL_EXPORT void taos_cleanup(void); -DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); -DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); -DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port); -DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); -DLL_EXPORT void taos_close(TAOS *taos); +typedef struct tmq_t tmq_t; +typedef struct tmq_conf_t tmq_conf_t; +typedef struct tmq_list_t tmq_list_t; -const char *taos_data_type(int type); +typedef struct tmq_message_t tmq_message_t; +typedef struct tmq_message_topic_t tmq_message_topic_t; +typedef struct tmq_message_tb_t tmq_message_tb_t; +typedef struct tmq_tb_iter_t tmq_tb_iter_t; +typedef struct tmq_message_col_t tmq_message_col_t; +typedef struct tmq_col_iter_t tmq_col_iter_t; typedef struct TAOS_BIND { int buffer_type; @@ -134,6 +136,15 @@ typedef struct TAOS_MULTI_BIND { int num; } TAOS_MULTI_BIND; +DLL_EXPORT void taos_cleanup(void); +DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); +DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); +DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port); +DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); +DLL_EXPORT void taos_close(TAOS *taos); + +const char *taos_data_type(int type); + DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags); @@ -192,16 +203,6 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); -typedef struct tmq_t tmq_t; -typedef struct tmq_conf_t tmq_conf_t; -typedef struct tmq_list_t tmq_list_t; - -typedef struct tmq_message_t tmq_message_t; -typedef struct tmq_message_topic_t tmq_message_topic_t; -typedef struct tmq_message_tb_t tmq_message_tb_t; -typedef struct tmq_tb_iter_t tmq_tb_iter_t; -typedef struct tmq_message_col_t tmq_message_col_t; -typedef struct tmq_col_iter_t tmq_col_iter_t; DLL_EXPORT tmq_list_t* tmq_list_new(); DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*); diff --git a/include/common/common.h b/include/common/common.h index 0299a29eb4..056d2789fe 100644 --- a/include/common/common.h +++ b/include/common/common.h @@ -38,6 +38,12 @@ // int16_t bytes; //} SSchema; +typedef struct { + uint32_t numOfTables; + SArray *pGroupList; + SHashObj *map; // speedup acquire the tableQueryInfo by table uid +} STableGroupInfo; + typedef struct SColumnDataAgg { int16_t colId; int64_t sum; @@ -57,17 +63,12 @@ typedef struct SDataBlockInfo { typedef struct SConstantItem { SColumnInfo info; - int32_t startIndex; // run-length-encoding to save the space for multiple rows - int32_t endIndex; + int32_t startRow; // run-length-encoding to save the space for multiple rows + int32_t endRow; SVariant value; } SConstantItem; -typedef struct { - uint32_t numOfTables; - SArray *pGroupList; - SHashObj *map; // speedup acquire the tableQueryInfo by table uid -} STableGroupInfo; - +// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList); typedef struct SSDataBlock { SColumnDataAgg *pBlockAgg; SArray *pDataBlock; // SArray @@ -75,9 +76,12 @@ typedef struct SSDataBlock { SDataBlockInfo info; } SSDataBlock; +// pBlockAgg->numOfNull == info.rows, all data are null +// pBlockAgg->numOfNull == 0, no data are null. typedef struct SColumnInfoData { - SColumnInfo info; // TODO filter info needs to be removed - char *pData; // the corresponding block data in memory + SColumnInfo info; // TODO filter info needs to be removed + char *nullbitmap;// + char *pData; // the corresponding block data in memory } SColumnInfoData; static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5de006307d..bbe3975cfb 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -296,6 +296,7 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) { } return buf; } + typedef struct { int32_t acctId; int64_t clusterId; @@ -303,6 +304,7 @@ typedef struct { int8_t superUser; int8_t align[3]; SEpSet epSet; + char sVersion[128]; } SConnectRsp; typedef struct { @@ -513,21 +515,26 @@ typedef struct { } SAlterDbReq; typedef struct { - char db[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; int8_t ignoreNotExists; } SDropDbReq; typedef struct { - char db[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + uint64_t uid; +} SDropDbRsp; + +typedef struct { + char db[TSDB_DB_FNAME_LEN]; int32_t vgVersion; } SUseDbReq; typedef struct { - char db[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; } SSyncDbReq; typedef struct { - char db[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; } SCompactDbReq; typedef struct { diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index b346e54add..77249e3f0b 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -122,6 +122,7 @@ typedef struct STscObj { char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; char db[TSDB_DB_FNAME_LEN]; + char ver[128]; int32_t acctId; uint32_t connId; int32_t connType; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 4100781a0b..d7f0afa785 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -218,12 +218,10 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag, if (pQueryNode->type == TSDB_SQL_SELECT) { setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols); - tfree(pSchema); pRequest->type = TDMT_VND_QUERY; - } else { - tfree(pSchema); } + tfree(pSchema); return code; } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index fda7287a4b..bdf3115981 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -277,3 +277,70 @@ int taos_affected_rows(TAOS_RES *res) { } int taos_result_precision(TAOS_RES *res) { return TSDB_TIME_PRECISION_MILLI; } + +int taos_select_db(TAOS *taos, const char *db) { + STscObj *pObj = (STscObj *)taos; + if (pObj == NULL) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return TSDB_CODE_TSC_DISCONNECTED; + } + + if (db == NULL || strlen(db) == 0) { + terrno = TSDB_CODE_TSC_INVALID_INPUT; + return terrno; + } + + char sql[256] = {0}; + snprintf(sql, tListLen(sql), "use %s", db); + + TAOS_RES* pRequest = taos_query(taos, sql); + int32_t code = taos_errno(pRequest); + + taos_free_result(pRequest); + return code; +} + +void taos_stop_query(TAOS_RES *res) { + if (res == NULL) { + return; + } + + SRequestObj* pRequest = (SRequestObj*) res; + int32_t numOfFields = taos_num_fields(pRequest); + + // It is not a query, no need to stop. + if (numOfFields == 0) { + return; + } + +// scheduleCancelJob(pRequest->body.pQueryJob); +} + +bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { + return false; +} + +int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { + return 0; +} + +int taos_validate_sql(TAOS *taos, const char *sql) { + return true; +} + +const char *taos_get_server_info(TAOS *taos) { + if (taos == NULL) { + return NULL; + } + + STscObj* pTscObj = (STscObj*) taos; + return pTscObj->ver; +} + +void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { + // TODO +} + +void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { + // TODO +} \ No newline at end of file diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 028f44da14..c9b1d07965 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -68,6 +68,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { pTscObj->connId = pConnect->connId; pTscObj->acctId = pConnect->acctId; + tstrncpy(pTscObj->ver, pConnect->sVersion, tListLen(pTscObj->ver)); // update the appInstInfo pTscObj->pAppInfo->clusterId = pConnect->clusterId; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 4dda0b3757..3d57a5de1d 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -61,182 +61,182 @@ TEST(testCase, connect_Test) { taos_close(pConn); } -//TEST(testCase, create_user_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, create_account_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, drop_account_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, show_user_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "show users"); -// TAOS_ROW pRow = NULL; -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while ((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, drop_user_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "drop user abc"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, show_db_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "show databases"); -// TAOS_ROW pRow = NULL; -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while ((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_close(pConn); -//} -// -//TEST(testCase, create_db_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); -// if (taos_errno(pRes) != 0) { -// printf("error in create db, reason:%s\n", taos_errstr(pRes)); -// } -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// ASSERT_TRUE(pFields == NULL); -// -// int32_t numOfFields = taos_num_fields(pRes); -// ASSERT_EQ(numOfFields, 0); -// -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create database abc1 vgroups 4"); -// if (taos_errno(pRes) != 0) { -// printf("error in create db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_close(pConn); -//} -// -//TEST(testCase, create_dnode_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create dnode abc1 port 7000"); -// if (taos_errno(pRes) != 0) { -// printf("error in create dnode, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create dnode 1.1.1.1 port 9000"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create dnode, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// taos_close(pConn); -//} -// -//TEST(testCase, drop_dnode_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "drop dnode 3"); -// if (taos_errno(pRes) != 0) { -// printf("error in drop dnode, reason:%s\n", taos_errstr(pRes)); -// } -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// ASSERT_TRUE(pFields == NULL); -// -// int32_t numOfFields = taos_num_fields(pRes); -// ASSERT_EQ(numOfFields, 0); -// -// pRes = taos_query(pConn, "drop dnode 4"); -// if (taos_errno(pRes) != 0) { -// printf("error in drop dnode, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, use_db_test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in use db, reason:%s\n", taos_errstr(pRes)); -// } -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// ASSERT_TRUE(pFields == NULL); -// -// int32_t numOfFields = taos_num_fields(pRes); -// ASSERT_EQ(numOfFields, 0); -// -// taos_close(pConn); -//} -// +TEST(testCase, create_user_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, create_account_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, drop_account_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, show_user_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "show users"); + TAOS_ROW pRow = NULL; + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, drop_user_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "drop user abc"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, show_db_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "show databases"); + TAOS_ROW pRow = NULL; + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_close(pConn); +} + +TEST(testCase, create_db_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + } + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == NULL); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database abc1 vgroups 4"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + } + taos_close(pConn); +} + +TEST(testCase, create_dnode_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create dnode abc1 port 7000"); + if (taos_errno(pRes) != 0) { + printf("error in create dnode, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create dnode 1.1.1.1 port 9000"); + if (taos_errno(pRes) != 0) { + printf("failed to create dnode, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + taos_close(pConn); +} + +TEST(testCase, drop_dnode_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "drop dnode 3"); + if (taos_errno(pRes) != 0) { + printf("error in drop dnode, reason:%s\n", taos_errstr(pRes)); + } + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == NULL); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + pRes = taos_query(pConn, "drop dnode 4"); + if (taos_errno(pRes) != 0) { + printf("error in drop dnode, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, use_db_test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == NULL); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_close(pConn); +} + // TEST(testCase, drop_db_test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); @@ -258,251 +258,251 @@ TEST(testCase, connect_Test) { // taos_free_result(pRes); // taos_close(pConn); //} -// -//TEST(testCase, create_stable_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); -// if (taos_errno(pRes) != 0) { -// printf("error in create db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create table if not exists abc1.st1(ts timestamp, k int) tags(a int)"); -// if (taos_errno(pRes) != 0) { -// printf("error in create stable, reason:%s\n", taos_errstr(pRes)); -// } -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// ASSERT_TRUE(pFields == NULL); -// -// int32_t numOfFields = taos_num_fields(pRes); -// ASSERT_EQ(numOfFields, 0); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); -// } -// -// pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// pRes = taos_query(pConn, "drop stable `123_$^)`"); -// if (taos_errno(pRes) != 0) { -// printf("failed to drop super table 123_$^), reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_close(pConn); -//} -// -//TEST(testCase, create_table_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)"); -// ASSERT_EQ(taos_errno(pRes), 0); -// -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)"); -// ASSERT_NE(taos_errno(pRes), 0); -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, create_ctable_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int ) tags(a int)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create stable, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, show_stable_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != nullptr); -// -// TAOS_RES* pRes = taos_query(pConn, "show abc1.stables"); -// if (taos_errno(pRes) != 0) { -// printf("failed to show stables, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while ((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, show_vgroup_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "show vgroups"); -// if (taos_errno(pRes) != 0) { -// printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while ((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, create_multiple_tables) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// ASSERT_NE(pConn, nullptr); -// -// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create db, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// taos_close(pConn); -// return; -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// taos_close(pConn); -// return; -// } -// -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create stable tables, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// taos_free_result(pRes); -// pRes = taos_query(pConn, "create table t_3 using st1 tags(2)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while ((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// -// for (int32_t i = 0; i < 20; ++i) { -// char sql[512] = {0}; -// snprintf(sql, tListLen(sql), -// "create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5) t_x_%d using st1 tags(911)", i, -// (i + 1) * 30, (i + 2) * 40); -// TAOS_RES* pres = taos_query(pConn, sql); -// if (taos_errno(pres) != 0) { -// printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); -// } -// taos_free_result(pres); -// } -// -// taos_close(pConn); -//} -// -//TEST(testCase, show_table_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "show tables"); -// if (taos_errno(pRes) != 0) { -// printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// } -// -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "show abc1.tables"); -// if (taos_errno(pRes) != 0) { -// printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// int32_t count = 0; -// char str[512] = {0}; -// -// while ((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%d: %s\n", ++count, str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// + +TEST(testCase, create_stable_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists abc1.st1(ts timestamp, k int) tags(a int)"); + if (taos_errno(pRes) != 0) { + printf("error in create stable, reason:%s\n", taos_errstr(pRes)); + } + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == NULL); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); + } + + pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + pRes = taos_query(pConn, "drop stable `123_$^)`"); + if (taos_errno(pRes) != 0) { + printf("failed to drop super table 123_$^), reason:%s\n", taos_errstr(pRes)); + } + + taos_close(pConn); +} + +TEST(testCase, create_table_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)"); + ASSERT_EQ(taos_errno(pRes), 0); + + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)"); + ASSERT_NE(taos_errno(pRes), 0); + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, create_ctable_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int ) tags(a int)"); + if (taos_errno(pRes) != 0) { + printf("failed to create stable, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, show_stable_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != nullptr); + + TAOS_RES* pRes = taos_query(pConn, "show abc1.stables"); + if (taos_errno(pRes) != 0) { + printf("failed to show stables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, show_vgroup_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "show vgroups"); + if (taos_errno(pRes) != 0) { + printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, create_multiple_tables) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to create db, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + taos_close(pConn); + return; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to use db, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + taos_close(pConn); + return; + } + + taos_free_result(pRes); + + pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); + if (taos_errno(pRes) != 0) { + printf("failed to create stable tables, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + taos_free_result(pRes); + pRes = taos_query(pConn, "create table t_3 using st1 tags(2)"); + if (taos_errno(pRes) != 0) { + printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + + for (int32_t i = 0; i < 20; ++i) { + char sql[512] = {0}; + snprintf(sql, tListLen(sql), + "create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5) t_x_%d using st1 tags(911)", i, + (i + 1) * 30, (i + 2) * 40); + TAOS_RES* pres = taos_query(pConn, sql); + if (taos_errno(pres) != 0) { + printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); + } + taos_free_result(pres); + } + + taos_close(pConn); +} + +TEST(testCase, show_table_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "show tables"); + if (taos_errno(pRes) != 0) { + printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + } + + taos_free_result(pRes); + + pRes = taos_query(pConn, "show abc1.tables"); + if (taos_errno(pRes) != 0) { + printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + int32_t count = 0; + char str[512] = {0}; + + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%d: %s\n", ++count, str); + } + + taos_free_result(pRes); + taos_close(pConn); +} + //TEST(testCase, drop_stable_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != nullptr); @@ -527,177 +527,177 @@ TEST(testCase, connect_Test) { // taos_free_result(pRes); // taos_close(pConn); //} -// -//TEST(testCase, generated_request_id_test) { -// SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); -// -// for (int32_t i = 0; i < 50000; ++i) { -// uint64_t v = generateRequestId(); -// void* result = taosHashGet(phash, &v, sizeof(v)); -// if (result != nullptr) { -// printf("0x%lx, index:%d\n", v, i); -// } -// assert(result == nullptr); -// taosHashPut(phash, &v, sizeof(v), NULL, 0); -// } -// -// taosHashCleanup(phash); -//} -// -//TEST(testCase, insert_test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// ASSERT_NE(pConn, nullptr); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "insert into t_2 values(now, 1)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create into table t_2, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//#if 0 -//TEST(testCase, create_topic_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in use db, reason:%s\n", taos_errstr(pRes)); -// } -// //taos_free_result(pRes); -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// ASSERT_TRUE(pFields == nullptr); -// -// int32_t numOfFields = taos_num_fields(pRes); -// ASSERT_EQ(numOfFields, 0); -// -// taos_free_result(pRes); -// -// char* sql = "select * from tu"; -// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -// -//TEST(testCase, tmq_subscribe_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in use db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// tmq_conf_t* conf = tmq_conf_new(); -// tmq_conf_set(conf, "group.id", "tg1"); -// tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); -// -// tmq_list_t* topic_list = tmq_list_new(); -// tmq_list_append(topic_list, "test_topic_1"); -// tmq_subscribe(tmq, topic_list); -// -// while (1) { -// tmq_message_t* msg = tmq_consume_poll(tmq, 0); -// printf("get msg\n"); -// //if (msg == NULL) break; -// } -//} -//#endif -// -//TEST(testCase, tmq_consume_Test) { -//} -// -//TEST(testCase, tmq_commit_TEST) { -//} -// -//TEST(testCase, projection_query_tables) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// ASSERT_NE(pConn, nullptr); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create table tu using st1 tags(1)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// for(int32_t i = 0; i < 100000; ++i) { -// char sql[512] = {0}; -// sprintf(sql, "insert into tu values(now+%da, %d)", i, i); -// TAOS_RES* p = taos_query(pConn, sql); -// if (taos_errno(p) != 0) { -// printf("failed to insert data, reason:%s\n", taos_errstr(p)); -// } -// -// taos_free_result(p); -// } -// -// pRes = taos_query(pConn, "select * from tu"); -// if (taos_errno(pRes) != 0) { -// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while ((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, projection_query_stables) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// ASSERT_NE(pConn, nullptr); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "select ts from st1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while ((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} + +TEST(testCase, generated_request_id_test) { + SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + + for (int32_t i = 0; i < 50000; ++i) { + uint64_t v = generateRequestId(); + void* result = taosHashGet(phash, &v, sizeof(v)); + if (result != nullptr) { + printf("0x%lx, index:%d\n", v, i); + } + assert(result == nullptr); + taosHashPut(phash, &v, sizeof(v), NULL, 0); + } + + taosHashCleanup(phash); +} + +TEST(testCase, insert_test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into t_2 values(now, 1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create into table t_2, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +#if 0 +TEST(testCase, create_topic_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + //taos_free_result(pRes); + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == nullptr); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + + char* sql = "select * from tu"; + pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); + taos_free_result(pRes); + taos_close(pConn); +} + + +TEST(testCase, tmq_subscribe_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg1"); + tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); + + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "test_topic_1"); + tmq_subscribe(tmq, topic_list); + + while (1) { + tmq_message_t* msg = tmq_consume_poll(tmq, 0); + printf("get msg\n"); + //if (msg == NULL) break; + } +} +#endif + +TEST(testCase, tmq_consume_Test) { +} + +TEST(testCase, tmq_commit_TEST) { +} + +TEST(testCase, projection_query_tables) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); + if (taos_errno(pRes) != 0) { + printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table tu using st1 tags(1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + for(int32_t i = 0; i < 100000; ++i) { + char sql[512] = {0}; + sprintf(sql, "insert into tu values(now+%da, %d)", i, i); + TAOS_RES* p = taos_query(pConn, sql); + if (taos_errno(p) != 0) { + printf("failed to insert data, reason:%s\n", taos_errstr(p)); + } + + taos_free_result(p); + } + + pRes = taos_query(pConn, "select * from tu"); + if (taos_errno(pRes) != 0) { + printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, projection_query_stables) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "select ts from st1"); + if (taos_errno(pRes) != 0) { + printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} TEST(testCase, agg_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); diff --git a/source/common/src/tep.c b/source/common/src/tep.c index cf38ab8dd9..45587a8856 100644 --- a/source/common/src/tep.c +++ b/source/common/src/tep.c @@ -1,4 +1,5 @@ #include "tep.h" +#include "common.h" #include "tglobal.h" #include "tlockfree.h" @@ -59,3 +60,99 @@ SEpSet getEpSet_s(SCorEpSet *pEpSet) { return ep; } +bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row, SColumnDataAgg* pColAgg) { + if (pColAgg != NULL) { + if (pColAgg->numOfNull == totalRows) { + ASSERT(pColumnInfoData->nullbitmap == NULL); + return true; + } else if (pColAgg->numOfNull == 0) { + ASSERT(pColumnInfoData->nullbitmap == NULL); + return false; + } + } + + if (pColumnInfoData->nullbitmap == NULL) { + return false; + } + + uint8_t v = (pColumnInfoData->nullbitmap[row>>3] & (1<<(8 - (row&0x07)))); + return (v == 1); +} + +bool colDataIsNull_f(const char* bitmap, uint32_t row) { + return (bitmap[row>>3] & (1<<(8 - (row&0x07)))); +} + +void colDataSetNull_f(char* bitmap, uint32_t row) { // TODO + return; +} + +void* colDataGet(const SColumnInfoData* pColumnInfoData, uint32_t row) { + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + uint32_t offset = ((uint32_t*)pColumnInfoData->pData)[row]; + return (char*)(pColumnInfoData->pData) + offset; // the first part is the pointer to the true binary data + } else { + return (char*)(pColumnInfoData->pData) + (row * pColumnInfoData->info.bytes); + } +} + +int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull) { + ASSERT(pColumnInfoData != NULL); + + if (isNull) { + // TODO set null value in the nullbitmap + return 0; + } + + int32_t type = pColumnInfoData->info.type; + if (IS_VAR_DATA_TYPE(type)) { + // TODO continue append var_type + } else { + char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow; + switch(type) { + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: {*(int8_t*) p = *(int8_t*) pData;break;} + default: + assert(0); + } + + } + + return 0; +} + +size_t colDataGetCols(const SSDataBlock* pBlock) { + ASSERT(pBlock); + + size_t constantCols = (pBlock->pConstantList != NULL)? taosArrayGetSize(pBlock->pConstantList):0; + ASSERT( pBlock->info.numOfCols == taosArrayGetSize(pBlock->pDataBlock) + constantCols); + return pBlock->info.numOfCols; +} + +size_t colDataGetRows(const SSDataBlock* pBlock) { + return pBlock->info.rows; +} + +int32_t colDataUpdateTsWindow(SSDataBlock* pDataBlock) { + if (pDataBlock == NULL || pDataBlock->info.rows <= 0) { + return 0; + } + + if (pDataBlock->info.numOfCols <= 0) { + return -1; + } + + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0); + if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) { + return 0; + } + + ASSERT(pColInfoData->nullbitmap == NULL); + pDataBlock->info.window.skey = *(TSKEY*) colDataGet(pColInfoData, 0); + pDataBlock->info.window.ekey = *(TSKEY*) colDataGet(pColInfoData, (pDataBlock->info.rows - 1)); + return 0; +} + + + + diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 5ec9173fc8..00d1b61b74 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -128,6 +128,8 @@ typedef struct { int32_t failedTimes; void* rpcHandle; void* rpcAHandle; + void* rpcRsp; + int32_t rpcRspLen; SArray* redoLogs; SArray* undoLogs; SArray* commitLogs; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index fda3fed13d..f1a213790c 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -36,16 +36,17 @@ typedef struct { int32_t mndInitTrans(SMnode *pMnode); void mndCleanupTrans(SMnode *pMnode); -STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg); +STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq); void mndTransDrop(STrans *pTrans); int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); +void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); -void mndTransProcessRsp(SMnodeMsg *pMsg); +void mndTransProcessRsp(SMnodeMsg *pRsp); void mndTransPullup(SMnode *pMnode); #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 73c67fc59a..87a8becf33 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -767,6 +767,14 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) { if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; + + int32_t rspLen = sizeof(SDropDbRsp); + SDropDbRsp *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) goto DROP_DB_OVER; + memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN); + pRsp->uid = htobe64(pDb->uid); + mndTransSetRpcRsp(pTrans, pRsp, rspLen); + if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_DB_OVER; code = 0; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index fbb4e5cef7..a969c0162f 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -237,13 +237,11 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { if (pIter == NULL) break; if (pObj->pDnode == NULL) break; - pEpSet->eps[pEpSet->numOfEps].port = htons(pObj->pDnode->port); - memcpy(pEpSet->eps[pEpSet->numOfEps].fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN); if (pObj->role == TAOS_SYNC_STATE_LEADER) { pEpSet->inUse = pEpSet->numOfEps; } - pEpSet->numOfEps++; + addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, htons(pObj->pDnode->port)); sdbRelease(pSdb, pObj); } } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 005f9a3d3c..d379d294ab 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -14,14 +14,15 @@ */ #define _DEFAULT_SOURCE +#include "tglobal.h" #include "mndProfile.h" -#include "mndConsumer.h" +//#include "mndConsumer.h" #include "mndDb.h" #include "mndMnode.h" #include "mndShow.h" -#include "mndTopic.h" +//#include "mndTopic.h" #include "mndUser.h" -#include "mndVgroup.h" +//#include "mndVgroup.h" #define QUERY_ID_SIZE 20 #define QUERY_OBJ_ID_SIZE 18 @@ -230,10 +231,12 @@ static int32_t mndProcessConnectReq(SMnodeMsg *pReq) { goto CONN_OVER; } - pRsp->acctId = htonl(pUser->acctId); + pRsp->acctId = htonl(pUser->acctId); pRsp->superUser = pUser->superUser; pRsp->clusterId = htobe64(pMnode->clusterId); - pRsp->connId = htonl(pConn->id); + pRsp->connId = htonl(pConn->id); + + snprintf(pRsp->sVersion, tListLen(pRsp->sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, gitinfo); mndGetMnodeEpSet(pMnode, &pRsp->epSet); pReq->contLen = sizeof(SConnectRsp); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 808e6dcbe5..6686c0887f 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -308,6 +308,11 @@ static void mndTransDropData(STrans *pTrans) { mndTransDropLogs(pTrans->commitLogs); mndTransDropActions(pTrans->redoActions); mndTransDropActions(pTrans->undoActions); + if (pTrans->rpcRsp != NULL) { + rpcFreeCont(pTrans->rpcRsp); + pTrans->rpcRsp = NULL; + pTrans->rpcRspLen = 0; + } } static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { @@ -339,7 +344,7 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) { sdbRelease(pSdb, pTrans); } -STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) { +STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq) { STrans *pTrans = calloc(1, sizeof(STrans)); if (pTrans == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -350,8 +355,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) { pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS); pTrans->stage = TRN_STAGE_PREPARE; pTrans->policy = policy; - pTrans->rpcHandle = pMsg->handle; - pTrans->rpcAHandle = pMsg->ahandle; + pTrans->rpcHandle = pReq->handle; + pTrans->rpcAHandle = pReq->ahandle; pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); @@ -436,6 +441,11 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) { return mndTransAppendAction(pTrans->undoActions, pAction); } +void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen) { + pTrans->rpcRsp = pCont; + pTrans->rpcRspLen = contLen; +} + static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { SSdbRaw *pRaw = mndTransActionEncode(pTrans); if (pRaw == NULL) { @@ -479,6 +489,11 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { pNew->rpcHandle = pTrans->rpcHandle; pNew->rpcAHandle = pTrans->rpcAHandle; + pNew->rpcRsp = pTrans->rpcRsp; + pNew->rpcRspLen = pTrans->rpcRspLen; + pTrans->rpcRsp = NULL; + pTrans->rpcRspLen = 0; + mndTransExecute(pMnode, pNew); mndReleaseTrans(pMnode, pNew); return 0; @@ -529,15 +544,21 @@ static void mndTransSendRpcRsp(STrans *pTrans) { if (sendRsp && pTrans->rpcHandle != NULL) { mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage, pTrans->rpcAHandle); - SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle}; + SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, + .code = pTrans->code, + .ahandle = pTrans->rpcAHandle, + .pCont = pTrans->rpcRsp, + .contLen = pTrans->rpcRspLen}; rpcSendResponse(&rspMsg); pTrans->rpcHandle = NULL; + pTrans->rpcRsp = NULL; + pTrans->rpcRspLen = 0; } } -void mndTransProcessRsp(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - int64_t signature = (int64_t)(pMsg->rpcMsg.ahandle); +void mndTransProcessRsp(SMnodeMsg *pRsp) { + SMnode *pMnode = pRsp->pMnode; + int64_t signature = (int64_t)(pRsp->rpcMsg.ahandle); int32_t transId = (int32_t)(signature >> 32); int32_t action = (int32_t)((signature << 32) >> 32); @@ -571,10 +592,10 @@ void mndTransProcessRsp(SMnodeMsg *pMsg) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction != NULL) { pAction->msgReceived = 1; - pAction->errCode = pMsg->rpcMsg.code; + pAction->errCode = pRsp->rpcMsg.code; } - mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pMsg->rpcMsg.code, + mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pRsp->rpcMsg.code, pAction->acceptableCode); mndTransExecute(pMnode, pTrans); @@ -921,7 +942,7 @@ static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) { void mndTransPullup(SMnode *pMnode) { STrans *pTrans = NULL; - void * pIter = NULL; + void *pIter = NULL; while (1) { pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans); diff --git a/source/dnode/mnode/impl/test/db/db.cpp b/source/dnode/mnode/impl/test/db/db.cpp index 964a483aac..2d1574467e 100644 --- a/source/dnode/mnode/impl/test/db/db.cpp +++ b/source/dnode/mnode/impl/test/db/db.cpp @@ -202,6 +202,10 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) { SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, 0); + + SDropDbRsp* pDrop = (SDropDbRsp*)pRsp->pCont; + pDrop->uid = htobe64(pDrop->uid); + EXPECT_STREQ(pDrop->db, "1.d1"); } test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, ""); @@ -249,6 +253,8 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { EXPECT_EQ(test.GetShowRows(), 1); CheckBinary("d2", TSDB_DB_NAME_LEN - 1); + uint64_t d2_uid = 0; + { int32_t contLen = sizeof(SUseDbReq); @@ -262,6 +268,8 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { SUseDbRsp* pRsp = (SUseDbRsp*)pMsg->pCont; EXPECT_STREQ(pRsp->db, "1.d2"); + pRsp->uid = htobe64(pRsp->uid); + d2_uid = pRsp->uid; pRsp->vgVersion = htonl(pRsp->vgVersion); pRsp->vgNum = htonl(pRsp->vgNum); pRsp->hashMethod = pRsp->hashMethod; @@ -311,5 +319,10 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, 0); + + SDropDbRsp* pDrop = (SDropDbRsp*)pRsp->pCont; + pDrop->uid = htobe64(pDrop->uid); + EXPECT_STREQ(pDrop->db, "1.d2"); + EXPECT_EQ(pDrop->uid, d2_uid); } } diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 8d58b262aa..5d96ab47c3 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -1125,8 +1125,6 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) { *str = cJSON_Print(json); cJSON_Delete(json); -// printf("====Physical plan:====\n"); -// printf("%s\n", *str); *len = strlen(*str) + 1; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index d546925c5f..e6b7eaca7f 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -40,7 +40,8 @@ void qDestroyQueryDag(struct SQueryDag* pDag) { tfree(pDag); } -int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, SSchema** pResSchema, int32_t* numOfCols, SArray* pNodeList, uint64_t requestId) { +int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, SSchema** pResSchema, int32_t* numOfCols, SArray* pNodeList, + uint64_t requestId) { SQueryPlanNode* pLogicPlan; int32_t code = createQueryPlan(pNode, &pLogicPlan); if (TSDB_CODE_SUCCESS != code) { @@ -49,9 +50,10 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, } if (pLogicPlan->info.type != QNODE_MODIFY) { -// char* str = NULL; -// queryPlanToString(pLogicPlan, &str); -// printf("%s\n", str); + char* str = NULL; + queryPlanToString(pLogicPlan, &str); + qDebug("reqId:0x%"PRIx64": %s", requestId, str); + tfree(str); } code = optimizeQueryPlan(pLogicPlan); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 92a95edf98..621ed68bae 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1194,17 +1194,18 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { code = atomic_load_32(&pJob->errCode); SCH_ERR_RET(code); - SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); } SSubplan *plan = pTask->plan; - if (NULL == pTask->msg) { + if (NULL == pTask->msg) { // TODO add more detailed reason for failure code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen); if (TSDB_CODE_SUCCESS != code || NULL == pTask->msg || pTask->msgLen <= 0) { - SCH_TASK_ELOG("subplanToString error, code:%x, msg:%p, len:%d", code, pTask->msg, pTask->msgLen); + SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen); SCH_ERR_JRET(code); + } else { + SCH_TASK_DLOG(" ===physical plan=== len:%d, %s", pTask->msgLen, pTask->msg); } } @@ -1218,13 +1219,10 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { } SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); - return TSDB_CODE_SUCCESS; _return: - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code)); - SCH_RET(code); } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d64df9b0f3..069ebaeb8a 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -45,13 +45,13 @@ typedef struct SCliThrdObj { pthread_t thread; uv_loop_t* loop; uv_async_t* cliAsync; // - uv_timer_t* pTimer; + uv_timer_t* timer; void* pool; // conn pool queue msg; pthread_mutex_t msgMtx; uint64_t nextTimeout; // next timeout void* pTransInst; // - + bool quit; } SCliThrdObj; typedef struct SClientObj { @@ -94,6 +94,8 @@ static void clientHandleResp(SCliConn* conn); static void clientHandleExcept(SCliConn* conn); // handle req from app static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void clientSendQuit(SCliThrdObj* thrd); static void destroyUserdata(SRpcMsg* userdata); @@ -136,8 +138,8 @@ static void clientHandleResp(SCliConn* conn) { destroyCmsg(pMsg); conn->data = NULL; // start thread's timer of conn pool if not active - if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) { - uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); + if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) { + uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } } static void clientHandleExcept(SCliConn* pConn) { @@ -155,7 +157,7 @@ static void clientHandleExcept(SCliConn* pConn) { SRpcMsg rpcMsg = {0}; rpcMsg.ahandle = pCtx->ahandle; - rpcMsg.code = -1; + rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; // SRpcInfo* pRpc = pMsg->ctx->pRpc; (pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL); pConn->notifyCount += 1; @@ -332,9 +334,8 @@ static void clientWriteCb(uv_write_t* req, int status) { tDebug("conn %p data already was written out", pConn); SCliMsg* pMsg = pConn->data; if (pMsg == NULL) { - destroy - // handle - return; + // handle + return; } destroyUserdata(&pMsg->msg); } else { @@ -375,6 +376,15 @@ static void clientConnCb(uv_connect_t* req, int status) { clientWrite(pConn); } +static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { + tDebug("thread %p start to quit", pThrd); + destroyCmsg(pMsg); + uv_close((uv_handle_t*)pThrd->cliAsync, NULL); + uv_timer_stop(pThrd->timer); + pThrd->quit = true; + // uv__async_stop(pThrd->cliAsync); + uv_stop(pThrd->loop); +} static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; @@ -389,7 +399,13 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->data = pMsg; conn->writeReq->data = conn; transDestroyBuffer(&conn->readBuf); + + if (pThrd->quit) { + clientHandleExcept(conn); + return; + } clientWrite(conn); + } else { SCliConn* conn = calloc(1, sizeof(SCliConn)); conn->ref++; @@ -430,7 +446,12 @@ static void clientAsyncCb(uv_async_t* handle) { QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - clientHandleReq(pMsg, pThrd); + if (pMsg->ctx == NULL) { + clientHandleQuit(pMsg, pThrd); + } else { + clientHandleReq(pMsg, pThrd); + } + // clientHandleReq(pMsg, pThrd); count++; } if (count >= 2) { @@ -458,7 +479,7 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); if (err == 0) { - tDebug("sucess to create tranport-client thread %d", i); + tDebug("success to create tranport-client thread %d", i); } cli->pThreadObj[i] = pThrd; } @@ -492,20 +513,24 @@ static SCliThrdObj* createThrdObj() { uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); pThrd->cliAsync->data = pThrd; - pThrd->pTimer = malloc(sizeof(uv_timer_t)); - uv_timer_init(pThrd->loop, pThrd->pTimer); - pThrd->pTimer->data = pThrd; + pThrd->timer = malloc(sizeof(uv_timer_t)); + uv_timer_init(pThrd->loop, pThrd->timer); + pThrd->timer->data = pThrd; pThrd->pool = creatConnPool(1); + + pThrd->quit = false; return pThrd; } static void destroyThrdObj(SCliThrdObj* pThrd) { if (pThrd == NULL) { return; } + uv_stop(pThrd->loop); pthread_join(pThrd->thread, NULL); pthread_mutex_destroy(&pThrd->msgMtx); free(pThrd->cliAsync); + free(pThrd->timer); free(pThrd->loop); free(pThrd); } @@ -517,10 +542,22 @@ static void transDestroyConnCtx(STransConnCtx* ctx) { free(ctx); } // +static void clientSendQuit(SCliThrdObj* thrd) { + // cli can stop gracefully + SCliMsg* msg = calloc(1, sizeof(SCliMsg)); + msg->ctx = NULL; // + + pthread_mutex_lock(&thrd->msgMtx); + QUEUE_PUSH(&thrd->msg, &msg->q); + pthread_mutex_unlock(&thrd->msgMtx); + + uv_async_send(thrd->cliAsync); +} void taosCloseClient(void* arg) { // impl later SClientObj* cli = arg; for (int i = 0; i < cli->numOfThreads; i++) { + clientSendQuit(cli->pThreadObj[i]); destroyThrdObj(cli->pThreadObj[i]); } free(cli->pThreadObj); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index a5ee1f1c63..475ef32b46 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -70,6 +70,7 @@ typedef struct SServerObj { uv_pipe_t** pipe; uint32_t ip; uint32_t port; + uv_async_t* pAcceptAsync; // just to quit from from accept thread } SServerObj; static const char* notify = "a"; @@ -88,9 +89,11 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status); static void uvOnAcceptCb(uv_stream_t* stream, int status); static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void uvWorkerAsyncCb(uv_async_t* handle); +static void uvAcceptAsyncCb(uv_async_t* handle); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); static void uvStartSendResp(SSrvMsg* msg); + static void destroySmsg(SSrvMsg* smsg); // check whether already read complete packet static bool readComplete(SConnBuffer* buf); @@ -389,7 +392,13 @@ void uvWorkerAsyncCb(uv_async_t* handle) { tError("except occurred, continue"); continue; } - uvStartSendResp(msg); + if (msg->pConn == NULL) { + // + free(msg); + uv_stop(pThrd->loop); + } else { + uvStartSendResp(msg); + } // uv_buf_t wb; // uvPrepareSendData(msg, &wb); // uv_timer_stop(conn->pTimer); @@ -397,6 +406,10 @@ void uvWorkerAsyncCb(uv_async_t* handle) { // uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); } } +static void uvAcceptAsyncCb(uv_async_t* async) { + SServerObj* srv = async->data; + uv_stop(srv->loop); +} void uvOnAcceptCb(uv_stream_t* stream, int status) { if (status == -1) { @@ -517,8 +530,12 @@ static bool addHandleToAcceptloop(void* arg) { return false; } - struct sockaddr_in bind_addr; + // register an async here to quit server gracefully + srv->pAcceptAsync = calloc(1, sizeof(uv_async_t)); + uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb); + srv->pAcceptAsync->data = srv; + struct sockaddr_in bind_addr; uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) { tError("failed to bind: %s", uv_err_name(err)); @@ -647,21 +664,42 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { return; } pthread_join(pThrd->thread, NULL); - // free(srv->pipe[i]); free(pThrd->loop); - pthread_mutex_destroy(&pThrd->msgMtx); + free(pThrd->workerAsync); free(pThrd); } +void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { + SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); + + pthread_mutex_lock(&pThrd->msgMtx); + QUEUE_PUSH(&pThrd->msg, &srvMsg->q); + pthread_mutex_unlock(&pThrd->msgMtx); + tDebug("send quit msg to work thread"); + + uv_async_send(pThrd->workerAsync); +} + void taosCloseServer(void* arg) { // impl later SServerObj* srv = arg; for (int i = 0; i < srv->numOfThreads; i++) { + sendQuitToWorkThrd(srv->pThreadObj[i]); destroyWorkThrd(srv->pThreadObj[i]); } - free(srv->loop); - free(srv->pipe); - free(srv->pThreadObj); + + tDebug("send quit msg to accept thread"); + uv_async_send(srv->pAcceptAsync); pthread_join(srv->thread, NULL); + + free(srv->pThreadObj); + free(srv->pAcceptAsync); + free(srv->loop); + + for (int i = 0; i < srv->numOfThreads; i++) { + free(srv->pipe[i]); + } + free(srv->pipe); + free(srv); } diff --git a/source/libs/transport/test/CMakeLists.txt b/source/libs/transport/test/CMakeLists.txt index c61f688060..3d9c396336 100644 --- a/source/libs/transport/test/CMakeLists.txt +++ b/source/libs/transport/test/CMakeLists.txt @@ -1,6 +1,12 @@ add_executable(transportTest "") add_executable(client "") add_executable(server "") +add_executable(transUT "") + +target_sources(transUT + PRIVATE + "transUT.cc" +) target_sources(transportTest PRIVATE @@ -28,6 +34,13 @@ target_link_libraries (transportTest gtest_main transport ) +target_link_libraries (transUT + os + util + common + gtest_main + transport +) target_include_directories(client PUBLIC @@ -48,6 +61,13 @@ target_include_directories(server "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(transUT + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/transport" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) + + target_link_libraries (server os util diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc new file mode 100644 index 0000000000..08c683590b --- /dev/null +++ b/source/libs/transport/test/transUT.cc @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include +#include +#include +#include "trpc.h" +using namespace std; + +class TransObj { + public: + TransObj() { + const char *label = "APP"; + const char *secret = "secret"; + const char *user = "user"; + const char *ckey = "ckey"; + + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 0; + rpcInit.label = (char *)label; + rpcInit.numOfThreads = 5; + rpcInit.cfp = NULL; + rpcInit.sessions = 100; + rpcInit.idleTime = 100; + rpcInit.user = (char *)user; + rpcInit.secret = (char *)secret; + rpcInit.ckey = (char *)ckey; + rpcInit.spi = 1; + } + bool startCli() { + trans = NULL; + rpcInit.connType = TAOS_CONN_CLIENT; + trans = rpcOpen(&rpcInit); + return trans != NULL ? true : false; + } + bool startSrv() { + trans = NULL; + rpcInit.connType = TAOS_CONN_SERVER; + trans = rpcOpen(&rpcInit); + return trans != NULL ? true : false; + } + bool stop() { + rpcClose(trans); + trans = NULL; + return true; + } + + private: + void * trans; + SRpcInit rpcInit; +}; +class TransEnv : public ::testing::Test { + protected: + virtual void SetUp() { + // set up trans obj + tr = new TransObj(); + } + virtual void TearDown() { + // tear down + delete tr; + } + + TransObj *tr = NULL; +}; +TEST_F(TransEnv, test_start_stop) { + assert(tr->startCli()); + assert(tr->stop()); + + assert(tr->startSrv()); + assert(tr->stop()); +} diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index c283423fbf..b785ed2707 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -360,6 +360,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { } } else { int num_rows_affacted = taos_affected_rows(pSql); + taos_free_result(pSql); et = taosGetTimestampUs(); printf("Query OK, %d of %d row(s) in database (%.6fs)\n", num_rows_affacted, num_rows_affacted, (et - st) / 1E6); }