Merge pull request #27416 from taosdata/fix/TD-31412
recover ci sample code
This commit is contained in:
commit
ea24282d14
|
@ -0,0 +1,78 @@
|
|||
PROJECT(TDengine)
|
||||
|
||||
IF (TD_LINUX)
|
||||
INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc)
|
||||
AUX_SOURCE_DIRECTORY(. SRC)
|
||||
|
||||
add_executable(tmq "")
|
||||
add_executable(stream_demo "")
|
||||
add_executable(schemaless "")
|
||||
add_executable(prepare "")
|
||||
add_executable(demo "")
|
||||
add_executable(asyncdemo "")
|
||||
|
||||
target_sources(tmq
|
||||
PRIVATE
|
||||
"tmq.c"
|
||||
)
|
||||
|
||||
target_sources(stream_demo
|
||||
PRIVATE
|
||||
"stream_demo.c"
|
||||
)
|
||||
|
||||
target_sources(schemaless
|
||||
PRIVATE
|
||||
"schemaless.c"
|
||||
)
|
||||
|
||||
target_sources(prepare
|
||||
PRIVATE
|
||||
"prepare.c"
|
||||
)
|
||||
|
||||
target_sources(demo
|
||||
PRIVATE
|
||||
"demo.c"
|
||||
)
|
||||
|
||||
target_sources(asyncdemo
|
||||
PRIVATE
|
||||
"asyncdemo.c"
|
||||
)
|
||||
|
||||
target_link_libraries(tmq
|
||||
taos
|
||||
)
|
||||
|
||||
target_link_libraries(stream_demo
|
||||
taos
|
||||
)
|
||||
|
||||
target_link_libraries(schemaless
|
||||
taos
|
||||
)
|
||||
|
||||
target_link_libraries(prepare
|
||||
taos
|
||||
)
|
||||
|
||||
target_link_libraries(demo
|
||||
taos
|
||||
)
|
||||
|
||||
target_link_libraries(asyncdemo
|
||||
taos
|
||||
)
|
||||
|
||||
SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq)
|
||||
SET_TARGET_PROPERTIES(stream_demo PROPERTIES OUTPUT_NAME stream_demo)
|
||||
SET_TARGET_PROPERTIES(schemaless PROPERTIES OUTPUT_NAME schemaless)
|
||||
SET_TARGET_PROPERTIES(prepare PROPERTIES OUTPUT_NAME prepare)
|
||||
SET_TARGET_PROPERTIES(demo PROPERTIES OUTPUT_NAME demo)
|
||||
SET_TARGET_PROPERTIES(asyncdemo PROPERTIES OUTPUT_NAME asyncdemo)
|
||||
ENDIF ()
|
||||
IF (TD_DARWIN)
|
||||
INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc)
|
||||
AUX_SOURCE_DIRECTORY(. SRC)
|
||||
ENDIF ()
|
|
@ -0,0 +1,293 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// TAOS asynchronous API example
|
||||
// this example opens multiple tables, insert/retrieve multiple tables
|
||||
// it is used by TAOS internally for one performance testing
|
||||
// to compiple: gcc -o asyncdemo asyncdemo.c -ltaos
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <sys/time.h>
|
||||
#include <unistd.h>
|
||||
#include <string.h>
|
||||
#include <inttypes.h>
|
||||
#include "taos.h"
|
||||
|
||||
int points = 5;
|
||||
int numOfTables = 3;
|
||||
int tablesInsertProcessed = 0;
|
||||
int tablesSelectProcessed = 0;
|
||||
int64_t st, et;
|
||||
|
||||
typedef struct {
|
||||
int id;
|
||||
TAOS *taos;
|
||||
char name[32];
|
||||
time_t timeStamp;
|
||||
int value;
|
||||
int rowsInserted;
|
||||
int rowsTried;
|
||||
int rowsRetrieved;
|
||||
} STable;
|
||||
|
||||
void taos_insert_call_back(void *param, TAOS_RES *tres, int code);
|
||||
void taos_select_call_back(void *param, TAOS_RES *tres, int code);
|
||||
void shellPrintError(TAOS *taos);
|
||||
|
||||
static void queryDB(TAOS *taos, char *command) {
|
||||
int i;
|
||||
TAOS_RES *pSql = NULL;
|
||||
int32_t code = -1;
|
||||
|
||||
for (i = 0; i < 5; i++) {
|
||||
if (NULL != pSql) {
|
||||
taos_free_result(pSql);
|
||||
pSql = NULL;
|
||||
}
|
||||
|
||||
pSql = taos_query(taos, command);
|
||||
code = taos_errno(pSql);
|
||||
if (0 == code) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql));
|
||||
taos_free_result(pSql);
|
||||
taos_close(taos);
|
||||
taos_cleanup();
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
taos_free_result(pSql);
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
TAOS *taos;
|
||||
struct timeval systemTime;
|
||||
int i;
|
||||
char sql[1024] = { 0 };
|
||||
char prefix[20] = { 0 };
|
||||
char db[128] = { 0 };
|
||||
STable *tableList;
|
||||
|
||||
if (argc != 5) {
|
||||
printf("usage: %s server-ip dbname rowsPerTable numOfTables\n", argv[0]);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
// a simple way to parse input parameters
|
||||
if (argc >= 3) strncpy(db, argv[2], sizeof(db) - 1);
|
||||
if (argc >= 4) points = atoi(argv[3]);
|
||||
if (argc >= 5) numOfTables = atoi(argv[4]);
|
||||
|
||||
size_t size = sizeof(STable) * (size_t)numOfTables;
|
||||
tableList = (STable *)malloc(size);
|
||||
memset(tableList, 0, size);
|
||||
|
||||
taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
|
||||
if (taos == NULL)
|
||||
shellPrintError(taos);
|
||||
|
||||
printf("success to connect to server\n");
|
||||
|
||||
sprintf(sql, "drop database if exists %s", db);
|
||||
queryDB(taos, sql);
|
||||
|
||||
sprintf(sql, "create database %s", db);
|
||||
queryDB(taos, sql);
|
||||
|
||||
sprintf(sql, "use %s", db);
|
||||
queryDB(taos, sql);
|
||||
|
||||
strcpy(prefix, "asytbl_");
|
||||
for (i = 0; i < numOfTables; ++i) {
|
||||
tableList[i].id = i;
|
||||
tableList[i].taos = taos;
|
||||
sprintf(tableList[i].name, "%s%d", prefix, i);
|
||||
sprintf(sql, "create table %s%d (ts timestamp, volume bigint)", prefix, i);
|
||||
queryDB(taos, sql);
|
||||
}
|
||||
|
||||
gettimeofday(&systemTime, NULL);
|
||||
for (i = 0; i < numOfTables; ++i)
|
||||
tableList[i].timeStamp = (time_t)(systemTime.tv_sec) * 1000 + systemTime.tv_usec / 1000;
|
||||
|
||||
printf("success to create tables, press any key to insert\n");
|
||||
getchar();
|
||||
|
||||
printf("start to insert...\n");
|
||||
gettimeofday(&systemTime, NULL);
|
||||
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
|
||||
tablesInsertProcessed = 0;
|
||||
tablesSelectProcessed = 0;
|
||||
|
||||
for (i = 0; i<numOfTables; ++i) {
|
||||
// insert records in asynchronous API
|
||||
sprintf(sql, "insert into %s values(%ld, 0)", tableList[i].name, 1546300800000 + i);
|
||||
taos_query_a(taos, sql, taos_insert_call_back, (void *)(tableList + i));
|
||||
}
|
||||
|
||||
printf("once insert finished, presse any key to query\n");
|
||||
getchar();
|
||||
|
||||
while(1) {
|
||||
if (tablesInsertProcessed < numOfTables) {
|
||||
printf("wait for process finished\n");
|
||||
sleep(1);
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
printf("start to query...\n");
|
||||
gettimeofday(&systemTime, NULL);
|
||||
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
|
||||
|
||||
for (i = 0; i < numOfTables; ++i) {
|
||||
// select records in asynchronous API
|
||||
sprintf(sql, "select * from %s", tableList[i].name);
|
||||
taos_query_a(taos, sql, taos_select_call_back, (void *)(tableList + i));
|
||||
}
|
||||
|
||||
printf("\nonce finished, press any key to exit\n");
|
||||
getchar();
|
||||
|
||||
while(1) {
|
||||
if (tablesSelectProcessed < numOfTables) {
|
||||
printf("wait for process finished\n");
|
||||
sleep(1);
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
for (i = 0; i<numOfTables; ++i) {
|
||||
printf("%s inserted:%d retrieved:%d\n", tableList[i].name, tableList[i].rowsInserted, tableList[i].rowsRetrieved);
|
||||
}
|
||||
|
||||
taos_close(taos);
|
||||
free(tableList);
|
||||
|
||||
printf("==== async demo end====\n");
|
||||
printf("\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void shellPrintError(TAOS *con)
|
||||
{
|
||||
fprintf(stderr, "TDengine error: %s\n", taos_errstr(con));
|
||||
taos_close(con);
|
||||
taos_cleanup();
|
||||
exit(1);
|
||||
}
|
||||
|
||||
void taos_insert_call_back(void *param, TAOS_RES *tres, int code)
|
||||
{
|
||||
STable *pTable = (STable *)param;
|
||||
struct timeval systemTime;
|
||||
char sql[128];
|
||||
|
||||
pTable->rowsTried++;
|
||||
|
||||
if (code < 0) {
|
||||
printf("%s insert failed, code:%d, rows:%d\n", pTable->name, code, pTable->rowsTried);
|
||||
}
|
||||
else if (code == 0) {
|
||||
printf("%s not inserted\n", pTable->name);
|
||||
}
|
||||
else {
|
||||
pTable->rowsInserted++;
|
||||
}
|
||||
|
||||
if (pTable->rowsTried < points) {
|
||||
// for this demo, insert another record
|
||||
sprintf(sql, "insert into %s values(%ld, %d)", pTable->name, 1546300800000+pTable->rowsTried*1000, pTable->rowsTried);
|
||||
taos_query_a(pTable->taos, sql, taos_insert_call_back, (void *)pTable);
|
||||
}
|
||||
else {
|
||||
printf("%d rows data are inserted into %s\n", points, pTable->name);
|
||||
tablesInsertProcessed++;
|
||||
if (tablesInsertProcessed >= numOfTables) {
|
||||
gettimeofday(&systemTime, NULL);
|
||||
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
printf("%" PRId64 " mseconds to insert %d data points\n", (et - st) / 1000, points*numOfTables);
|
||||
}
|
||||
}
|
||||
|
||||
taos_free_result(tres);
|
||||
}
|
||||
|
||||
void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows)
|
||||
{
|
||||
STable *pTable = (STable *)param;
|
||||
struct timeval systemTime;
|
||||
|
||||
if (numOfRows > 0) {
|
||||
|
||||
for (int i = 0; i<numOfRows; ++i) {
|
||||
// synchronous API to retrieve a row from batch of records
|
||||
/*TAOS_ROW row = */(void)taos_fetch_row(tres);
|
||||
// process row
|
||||
}
|
||||
|
||||
pTable->rowsRetrieved += numOfRows;
|
||||
|
||||
// retrieve next batch of rows
|
||||
taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable);
|
||||
|
||||
}
|
||||
else {
|
||||
if (numOfRows < 0)
|
||||
printf("%s retrieve failed, code:%d\n", pTable->name, numOfRows);
|
||||
|
||||
//taos_free_result(tres);
|
||||
printf("%d rows data retrieved from %s\n", pTable->rowsRetrieved, pTable->name);
|
||||
|
||||
tablesSelectProcessed++;
|
||||
if (tablesSelectProcessed >= numOfTables) {
|
||||
gettimeofday(&systemTime, NULL);
|
||||
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
printf("%" PRId64 " mseconds to query %d data rows\n", (et - st) / 1000, points * numOfTables);
|
||||
}
|
||||
|
||||
taos_free_result(tres);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
void taos_select_call_back(void *param, TAOS_RES *tres, int code)
|
||||
{
|
||||
STable *pTable = (STable *)param;
|
||||
|
||||
if (code == 0 && tres) {
|
||||
// asynchronous API to fetch a batch of records
|
||||
taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable);
|
||||
}
|
||||
else {
|
||||
printf("%s select failed, code:%d\n", pTable->name, code);
|
||||
taos_free_result(tres);
|
||||
taos_cleanup();
|
||||
exit(1);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,133 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// TAOS standard API example. The same syntax as MySQL, but only a subset
|
||||
// to compile: gcc -o demo demo.c -ltaos
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include "taos.h" // TAOS header file
|
||||
|
||||
static void queryDB(TAOS *taos, char *command) {
|
||||
int i;
|
||||
TAOS_RES *pSql = NULL;
|
||||
int32_t code = -1;
|
||||
|
||||
for (i = 0; i < 5; i++) {
|
||||
if (NULL != pSql) {
|
||||
taos_free_result(pSql);
|
||||
pSql = NULL;
|
||||
}
|
||||
|
||||
pSql = taos_query(taos, command);
|
||||
code = taos_errno(pSql);
|
||||
if (0 == code) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql));
|
||||
taos_free_result(pSql);
|
||||
taos_close(taos);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
taos_free_result(pSql);
|
||||
}
|
||||
|
||||
void Test(TAOS *taos, char *qstr, int i);
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
char qstr[1024];
|
||||
|
||||
// connect to server
|
||||
if (argc < 2) {
|
||||
printf("please input server-ip \n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
|
||||
if (taos == NULL) {
|
||||
printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
|
||||
exit(1);
|
||||
}
|
||||
for (int i = 0; i < 100; i++) {
|
||||
Test(taos, qstr, i);
|
||||
}
|
||||
taos_close(taos);
|
||||
taos_cleanup();
|
||||
}
|
||||
void Test(TAOS *taos, char *qstr, int index) {
|
||||
printf("==================test at %d\n================================", index);
|
||||
queryDB(taos, "drop database if exists demo");
|
||||
queryDB(taos, "create database demo");
|
||||
TAOS_RES *result;
|
||||
queryDB(taos, "use demo");
|
||||
|
||||
queryDB(taos, "create table m1 (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10))");
|
||||
printf("success to create table\n");
|
||||
|
||||
int i = 0;
|
||||
for (i = 0; i < 10; ++i) {
|
||||
sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')", (uint64_t)(1546300800000 + i * 1000), i, i, i, i*10000000, i*1.0, i*2.0, "hello");
|
||||
printf("qstr: %s\n", qstr);
|
||||
|
||||
// note: how do you wanna do if taos_query returns non-NULL
|
||||
// if (taos_query(taos, qstr)) {
|
||||
// printf("insert row: %i, reason:%s\n", i, taos_errstr(taos));
|
||||
// }
|
||||
TAOS_RES *result1 = taos_query(taos, qstr);
|
||||
if (result1 == NULL || taos_errno(result1) != 0) {
|
||||
printf("failed to insert row, reason:%s\n", taos_errstr(result1));
|
||||
taos_free_result(result1);
|
||||
exit(1);
|
||||
} else {
|
||||
printf("insert row: %i\n", i);
|
||||
}
|
||||
taos_free_result(result1);
|
||||
}
|
||||
printf("success to insert rows, total %d rows\n", i);
|
||||
|
||||
// query the records
|
||||
sprintf(qstr, "SELECT * FROM m1");
|
||||
result = taos_query(taos, qstr);
|
||||
if (result == NULL || taos_errno(result) != 0) {
|
||||
printf("failed to select, reason:%s\n", taos_errstr(result));
|
||||
taos_free_result(result);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
TAOS_ROW row;
|
||||
int rows = 0;
|
||||
int num_fields = taos_field_count(result);
|
||||
TAOS_FIELD *fields = taos_fetch_fields(result);
|
||||
|
||||
printf("num_fields = %d\n", num_fields);
|
||||
printf("select * from table, result:\n");
|
||||
// fetch the records row by row
|
||||
while ((row = taos_fetch_row(result))) {
|
||||
char temp[1024] = {0};
|
||||
rows++;
|
||||
taos_print_row(temp, row, fields, num_fields);
|
||||
printf("%s\n", temp);
|
||||
}
|
||||
|
||||
taos_free_result(result);
|
||||
printf("====demo end====\n\n");
|
||||
}
|
||||
|
|
@ -0,0 +1,235 @@
|
|||
// TAOS standard API example. The same syntax as MySQL, but only a subet
|
||||
// to compile: gcc -o prepare prepare.c -ltaos
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include "taos.h"
|
||||
|
||||
void taosMsleep(int mseconds);
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
TAOS *taos;
|
||||
TAOS_RES *result;
|
||||
int code;
|
||||
TAOS_STMT *stmt;
|
||||
|
||||
// connect to server
|
||||
if (argc < 2) {
|
||||
printf("please input server ip \n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
|
||||
if (taos == NULL) {
|
||||
printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
result = taos_query(taos, "drop database demo");
|
||||
taos_free_result(result);
|
||||
|
||||
result = taos_query(taos, "create database demo");
|
||||
code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("failed to create database, reason:%s\n", taos_errstr(result));
|
||||
taos_free_result(result);
|
||||
exit(1);
|
||||
}
|
||||
taos_free_result(result);
|
||||
|
||||
result = taos_query(taos, "use demo");
|
||||
taos_free_result(result);
|
||||
|
||||
// create table
|
||||
const char* sql = "create table m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin binary(40), blob nchar(10), varbin varbinary(16))";
|
||||
result = taos_query(taos, sql);
|
||||
code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("failed to create table, reason:%s\n", taos_errstr(result));
|
||||
taos_free_result(result);
|
||||
exit(1);
|
||||
}
|
||||
taos_free_result(result);
|
||||
|
||||
// sleep for one second to make sure table is created on data node
|
||||
// taosMsleep(1000);
|
||||
|
||||
// insert 10 records
|
||||
struct {
|
||||
int64_t ts;
|
||||
int8_t b;
|
||||
int8_t v1;
|
||||
int16_t v2;
|
||||
int32_t v4;
|
||||
int64_t v8;
|
||||
float f4;
|
||||
double f8;
|
||||
char bin[40];
|
||||
char blob[80];
|
||||
int8_t varbin[16];
|
||||
} v = {0};
|
||||
|
||||
int32_t boolLen = sizeof(int8_t);
|
||||
int32_t sintLen = sizeof(int16_t);
|
||||
int32_t intLen = sizeof(int32_t);
|
||||
int32_t bintLen = sizeof(int64_t);
|
||||
int32_t floatLen = sizeof(float);
|
||||
int32_t doubleLen = sizeof(double);
|
||||
int32_t binLen = sizeof(v.bin);
|
||||
int32_t ncharLen = 30;
|
||||
|
||||
stmt = taos_stmt_init(taos);
|
||||
TAOS_MULTI_BIND params[11];
|
||||
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
params[0].buffer_length = sizeof(v.ts);
|
||||
params[0].buffer = &v.ts;
|
||||
params[0].length = &bintLen;
|
||||
params[0].is_null = NULL;
|
||||
params[0].num = 1;
|
||||
|
||||
params[1].buffer_type = TSDB_DATA_TYPE_BOOL;
|
||||
params[1].buffer_length = sizeof(v.b);
|
||||
params[1].buffer = &v.b;
|
||||
params[1].length = &boolLen;
|
||||
params[1].is_null = NULL;
|
||||
params[1].num = 1;
|
||||
|
||||
params[2].buffer_type = TSDB_DATA_TYPE_TINYINT;
|
||||
params[2].buffer_length = sizeof(v.v1);
|
||||
params[2].buffer = &v.v1;
|
||||
params[2].length = &boolLen;
|
||||
params[2].is_null = NULL;
|
||||
params[2].num = 1;
|
||||
|
||||
params[3].buffer_type = TSDB_DATA_TYPE_SMALLINT;
|
||||
params[3].buffer_length = sizeof(v.v2);
|
||||
params[3].buffer = &v.v2;
|
||||
params[3].length = &sintLen;
|
||||
params[3].is_null = NULL;
|
||||
params[3].num = 1;
|
||||
|
||||
params[4].buffer_type = TSDB_DATA_TYPE_INT;
|
||||
params[4].buffer_length = sizeof(v.v4);
|
||||
params[4].buffer = &v.v4;
|
||||
params[4].length = &intLen;
|
||||
params[4].is_null = NULL;
|
||||
params[4].num = 1;
|
||||
|
||||
params[5].buffer_type = TSDB_DATA_TYPE_BIGINT;
|
||||
params[5].buffer_length = sizeof(v.v8);
|
||||
params[5].buffer = &v.v8;
|
||||
params[5].length = &bintLen;
|
||||
params[5].is_null = NULL;
|
||||
params[5].num = 1;
|
||||
|
||||
params[6].buffer_type = TSDB_DATA_TYPE_FLOAT;
|
||||
params[6].buffer_length = sizeof(v.f4);
|
||||
params[6].buffer = &v.f4;
|
||||
params[6].length = &floatLen;
|
||||
params[6].is_null = NULL;
|
||||
params[6].num = 1;
|
||||
|
||||
params[7].buffer_type = TSDB_DATA_TYPE_DOUBLE;
|
||||
params[7].buffer_length = sizeof(v.f8);
|
||||
params[7].buffer = &v.f8;
|
||||
params[7].length = &doubleLen;
|
||||
params[7].is_null = NULL;
|
||||
params[7].num = 1;
|
||||
|
||||
params[8].buffer_type = TSDB_DATA_TYPE_BINARY;
|
||||
params[8].buffer_length = sizeof(v.bin);
|
||||
params[8].buffer = v.bin;
|
||||
params[8].length = &binLen;
|
||||
params[8].is_null = NULL;
|
||||
params[8].num = 1;
|
||||
|
||||
strcpy(v.blob, "一二三四五六七八九十");
|
||||
params[9].buffer_type = TSDB_DATA_TYPE_NCHAR;
|
||||
params[9].buffer_length = sizeof(v.blob);
|
||||
params[9].buffer = v.blob;
|
||||
params[9].length = &ncharLen;
|
||||
params[9].is_null = NULL;
|
||||
params[9].num = 1;
|
||||
|
||||
int8_t tmp[16] = {'a', 0, 1, 13, '1'};
|
||||
int32_t vbinLen = 5;
|
||||
memcpy(v.varbin, tmp, sizeof(v.varbin));
|
||||
params[10].buffer_type = TSDB_DATA_TYPE_VARBINARY;
|
||||
params[10].buffer_length = sizeof(v.varbin);
|
||||
params[10].buffer = v.varbin;
|
||||
params[10].length = &vbinLen;
|
||||
params[10].is_null = NULL;
|
||||
params[10].num = 1;
|
||||
|
||||
char is_null = 1;
|
||||
|
||||
sql = "insert into m1 values(?,?,?,?,?,?,?,?,?,?,?)";
|
||||
code = taos_stmt_prepare(stmt, sql, 0);
|
||||
if (code != 0){
|
||||
printf("failed to execute taos_stmt_prepare. code:0x%x\n", code);
|
||||
}
|
||||
v.ts = 1591060628000;
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
v.ts += 1;
|
||||
for (int j = 1; j < 11; ++j) {
|
||||
params[j].is_null = ((i == j) ? &is_null : 0);
|
||||
}
|
||||
v.b = (int8_t)i % 2;
|
||||
v.v1 = (int8_t)i;
|
||||
v.v2 = (int16_t)(i * 2);
|
||||
v.v4 = (int32_t)(i * 4);
|
||||
v.v8 = (int64_t)(i * 8);
|
||||
v.f4 = (float)(i * 40);
|
||||
v.f8 = (double)(i * 80);
|
||||
for (int j = 0; j < sizeof(v.bin); ++j) {
|
||||
v.bin[j] = (char)(i + '0');
|
||||
}
|
||||
|
||||
taos_stmt_bind_param(stmt, params);
|
||||
taos_stmt_add_batch(stmt);
|
||||
}
|
||||
if (taos_stmt_execute(stmt) != 0) {
|
||||
printf("failed to execute insert statement.\n");
|
||||
exit(1);
|
||||
}
|
||||
taos_stmt_close(stmt);
|
||||
|
||||
// query the records
|
||||
stmt = taos_stmt_init(taos);
|
||||
taos_stmt_prepare(stmt, "SELECT * FROM m1 WHERE v1 > ? AND v2 < ?", 0);
|
||||
v.v1 = 5;
|
||||
v.v2 = 15;
|
||||
taos_stmt_bind_param(stmt, params + 2);
|
||||
if (taos_stmt_execute(stmt) != 0) {
|
||||
printf("failed to execute select statement.\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
result = taos_stmt_use_result(stmt);
|
||||
|
||||
TAOS_ROW row;
|
||||
int rows = 0;
|
||||
int num_fields = taos_num_fields(result);
|
||||
TAOS_FIELD *fields = taos_fetch_fields(result);
|
||||
|
||||
// fetch the records row by row
|
||||
while ((row = taos_fetch_row(result))) {
|
||||
char temp[256] = {0};
|
||||
rows++;
|
||||
taos_print_row(temp, row, fields, num_fields);
|
||||
printf("%s\n", temp);
|
||||
}
|
||||
if (rows == 2) {
|
||||
printf("two rows are fetched as expectation\n");
|
||||
} else {
|
||||
printf("expect two rows, but %d rows are fetched\n", rows);
|
||||
}
|
||||
|
||||
// taos_free_result(result);
|
||||
taos_stmt_close(stmt);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -0,0 +1,73 @@
|
|||
#include "taos.h"
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <sys/time.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
#include <inttypes.h>
|
||||
#include <string.h>
|
||||
|
||||
|
||||
int numSuperTables = 8;
|
||||
int numChildTables = 4;
|
||||
int numRowsPerChildTable = 2048;
|
||||
|
||||
static int64_t getTimeInUs() {
|
||||
struct timeval systemTime;
|
||||
gettimeofday(&systemTime, NULL);
|
||||
return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec;
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
TAOS_RES *result;
|
||||
const char* host = "127.0.0.1";
|
||||
const char* user = "root";
|
||||
const char* passwd = "taosdata";
|
||||
|
||||
taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
|
||||
TAOS* taos = taos_connect(host, user, passwd, "", 0);
|
||||
if (taos == NULL) {
|
||||
printf("\033[31mfailed to connect to db, reason:%s\033[0m\n", taos_errstr(taos));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
const char* info = taos_get_server_info(taos);
|
||||
printf("server info: %s\n", info);
|
||||
info = taos_get_client_info(taos);
|
||||
printf("client info: %s\n", info);
|
||||
result = taos_query(taos, "drop database if exists db;");
|
||||
taos_free_result(result);
|
||||
usleep(100000);
|
||||
result = taos_query(taos, "create database db precision 'ms';");
|
||||
taos_free_result(result);
|
||||
usleep(100000);
|
||||
|
||||
(void)taos_select_db(taos, "db");
|
||||
|
||||
time_t ct = time(0);
|
||||
int64_t ts = ct * 1000;
|
||||
char* lineFormat = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %" PRId64;
|
||||
|
||||
int lineNum = numSuperTables * numChildTables * numRowsPerChildTable;
|
||||
char** lines = calloc((size_t)lineNum, sizeof(char*));
|
||||
int l = 0;
|
||||
for (int i = 0; i < numSuperTables; ++i) {
|
||||
for (int j = 0; j < numChildTables; ++j) {
|
||||
for (int k = 0; k < numRowsPerChildTable; ++k) {
|
||||
char* line = calloc(512, 1);
|
||||
snprintf(line, 512, lineFormat, i, j, ts + 10 * l);
|
||||
lines[l] = line;
|
||||
++l;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
printf("%s\n", "begin taos_insert_lines");
|
||||
int64_t begin = getTimeInUs();
|
||||
TAOS_RES *res = taos_schemaless_insert(taos, lines, lineNum, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
|
||||
int64_t end = getTimeInUs();
|
||||
printf("code: %s. time used: %" PRId64 "\n", taos_errstr(res), end-begin);
|
||||
taos_free_result(res);
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// clang-format off
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include "taos.h"
|
||||
|
||||
int32_t init_env() {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
#if 0
|
||||
pRes = taos_query(pConn, "create database if not exists abc2 vgroups 20");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
#endif
|
||||
|
||||
pRes = taos_query(pConn, "use abc1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int, j varchar(20)) tags(a varchar(20))");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags('c1')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags('c2')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table if not exists tu3 using st1 tags('c3')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create child table tu3, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t create_stream() {
|
||||
printf("create stream\n");
|
||||
TAOS_RES* pRes;
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRes = taos_query(pConn, "use abc1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn,
|
||||
/*"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)");*/
|
||||
"create stream stream2 into outstb subtable(concat(concat(concat('prefix_', tname), '_suffix_'), cast(k1 as varchar(20)))) as select _wstart wstart, avg(k) from st1 partition by tbname tname, a k1 interval(10s);");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
if (argc > 1) {
|
||||
printf("env init\n");
|
||||
int code = init_env();
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
}
|
||||
create_stream();
|
||||
}
|
|
@ -0,0 +1,333 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include "taos.h"
|
||||
|
||||
static int running = 1;
|
||||
const char* topic_name = "topicname";
|
||||
|
||||
static int32_t msg_process(TAOS_RES* msg) {
|
||||
char buf[1024];
|
||||
int32_t rows = 0;
|
||||
|
||||
const char* topicName = tmq_get_topic_name(msg);
|
||||
const char* dbName = tmq_get_db_name(msg);
|
||||
int32_t vgroupId = tmq_get_vgroup_id(msg);
|
||||
|
||||
printf("topic: %s\n", topicName);
|
||||
printf("db: %s\n", dbName);
|
||||
printf("vgroup id: %d\n", vgroupId);
|
||||
|
||||
while (1) {
|
||||
TAOS_ROW row = taos_fetch_row(msg);
|
||||
if (row == NULL) break;
|
||||
|
||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||
int32_t numOfFields = taos_field_count(msg);
|
||||
// int32_t* length = taos_fetch_lengths(msg);
|
||||
int32_t precision = taos_result_precision(msg);
|
||||
rows++;
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
printf("precision: %d, row content: %s\n", precision, buf);
|
||||
}
|
||||
|
||||
return rows;
|
||||
}
|
||||
|
||||
static int32_t init_env() {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes;
|
||||
// drop database if exists
|
||||
printf("create database\n");
|
||||
pRes = taos_query(pConn, "drop topic topicname");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in drop topicname, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop database if exists tmqdb");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// create database
|
||||
pRes = taos_query(pConn, "create database tmqdb precision 'ns' WAL_RETENTION_PERIOD 3600");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// create super table
|
||||
printf("create super table\n");
|
||||
pRes = taos_query(
|
||||
pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// create sub tables
|
||||
printf("create sub tables\n");
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// insert data
|
||||
printf("insert data into sub tables\n");
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
|
||||
END:
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t create_topic() {
|
||||
printf("create topic\n");
|
||||
TAOS_RES* pRes;
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRes = taos_query(pConn, "use tmqdb");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3, tbname from tmqdb.stb where c1 > 1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||
printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
|
||||
}
|
||||
|
||||
tmq_t* build_consumer() {
|
||||
tmq_conf_res_t code;
|
||||
tmq_t* tmq = NULL;
|
||||
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
code = tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "group.id", "cgrpName");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "client.id", "user defined name");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "td.connect.user", "root");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
|
||||
_end:
|
||||
tmq_conf_destroy(conf);
|
||||
return tmq;
|
||||
}
|
||||
|
||||
tmq_list_t* build_topic_list() {
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
int32_t code = tmq_list_append(topicList, topic_name);
|
||||
if (code) {
|
||||
tmq_list_destroy(topicList);
|
||||
return NULL;
|
||||
}
|
||||
return topicList;
|
||||
}
|
||||
|
||||
void basic_consume_loop(tmq_t* tmq) {
|
||||
int32_t totalRows = 0;
|
||||
int32_t msgCnt = 0;
|
||||
int32_t timeout = 5000;
|
||||
while (running) {
|
||||
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
|
||||
if (tmqmsg) {
|
||||
msgCnt++;
|
||||
totalRows += msg_process(tmqmsg);
|
||||
taos_free_result(tmqmsg);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
}
|
||||
|
||||
void consume_repeatly(tmq_t* tmq) {
|
||||
int32_t numOfAssignment = 0;
|
||||
tmq_topic_assignment* pAssign = NULL;
|
||||
|
||||
int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "failed to get assignment, reason:%s", tmq_err2str(code));
|
||||
}
|
||||
|
||||
// seek to the earliest offset
|
||||
for(int32_t i = 0; i < numOfAssignment; ++i) {
|
||||
tmq_topic_assignment* p = &pAssign[i];
|
||||
|
||||
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "failed to seek to %d, reason:%s", (int)p->begin, tmq_err2str(code));
|
||||
}
|
||||
}
|
||||
|
||||
tmq_free_assignment(pAssign);
|
||||
|
||||
// let's do it again
|
||||
basic_consume_loop(tmq);
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
int32_t code;
|
||||
|
||||
if (init_env() < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (create_topic() < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
tmq_t* tmq = build_consumer();
|
||||
if (NULL == tmq) {
|
||||
fprintf(stderr, "build_consumer() fail!\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
if (NULL == topic_list) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if ((code = tmq_subscribe(tmq, topic_list))) {
|
||||
fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
|
||||
}
|
||||
|
||||
tmq_list_destroy(topic_list);
|
||||
|
||||
basic_consume_loop(tmq);
|
||||
|
||||
consume_repeatly(tmq);
|
||||
|
||||
code = tmq_consumer_close(tmq);
|
||||
if (code) {
|
||||
fprintf(stderr, "Failed to close consumer: %s\n", tmq_err2str(code));
|
||||
} else {
|
||||
fprintf(stderr, "Consumer closed\n");
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,2 @@
|
|||
/target
|
||||
Cargo.lock
|
|
@ -0,0 +1,18 @@
|
|||
[package]
|
||||
name = "rust"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
taos = "*"
|
||||
|
||||
[dev-dependencies]
|
||||
chrono = "0.4"
|
||||
itertools = "0.10.3"
|
||||
pretty_env_logger = "0.4.0"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
anyhow = "1"
|
|
@ -0,0 +1,80 @@
|
|||
use anyhow::Result;
|
||||
use serde::Deserialize;
|
||||
use taos::*;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let taos = TaosBuilder::from_dsn("taos://")?.build()?;
|
||||
taos.exec_many([
|
||||
"drop database if exists test",
|
||||
"create database test keep 36500",
|
||||
"use test",
|
||||
"create table tb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,
|
||||
c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned,
|
||||
c10 float, c11 double, c12 varchar(100), c13 nchar(100)) tags(t1 varchar(100))",
|
||||
])
|
||||
.await?;
|
||||
let mut stmt = Stmt::init(&taos)?;
|
||||
stmt.prepare(
|
||||
"insert into ? using tb1 tags(?) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
)?;
|
||||
stmt.set_tbname("d0")?;
|
||||
stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;
|
||||
|
||||
let params = vec![
|
||||
ColumnView::from_millis_timestamp(vec![164000000000]),
|
||||
ColumnView::from_bools(vec![true]),
|
||||
ColumnView::from_tiny_ints(vec![i8::MAX]),
|
||||
ColumnView::from_small_ints(vec![i16::MAX]),
|
||||
ColumnView::from_ints(vec![i32::MAX]),
|
||||
ColumnView::from_big_ints(vec![i64::MAX]),
|
||||
ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
|
||||
ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
|
||||
ColumnView::from_unsigned_ints(vec![u32::MAX]),
|
||||
ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
|
||||
ColumnView::from_floats(vec![f32::MAX]),
|
||||
ColumnView::from_doubles(vec![f64::MAX]),
|
||||
ColumnView::from_varchar(vec!["ABC"]),
|
||||
ColumnView::from_nchar(vec!["涛思数据"]),
|
||||
];
|
||||
let rows = stmt.bind(¶ms)?.add_batch()?.execute()?;
|
||||
assert_eq!(rows, 1);
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[allow(dead_code)]
|
||||
struct Row {
|
||||
ts: String,
|
||||
c1: bool,
|
||||
c2: i8,
|
||||
c3: i16,
|
||||
c4: i32,
|
||||
c5: i64,
|
||||
c6: u8,
|
||||
c7: u16,
|
||||
c8: u32,
|
||||
c9: u64,
|
||||
c10: Option<f32>,
|
||||
c11: f64,
|
||||
c12: String,
|
||||
c13: String,
|
||||
t1: serde_json::Value,
|
||||
}
|
||||
|
||||
let rows: Vec<Row> = taos
|
||||
.query("select * from tb1")
|
||||
.await?
|
||||
.deserialize()
|
||||
.try_collect()
|
||||
.await?;
|
||||
let row = &rows[0];
|
||||
dbg!(&row);
|
||||
assert_eq!(row.c5, i64::MAX);
|
||||
assert_eq!(row.c8, u32::MAX);
|
||||
assert_eq!(row.c9, u64::MAX);
|
||||
assert_eq!(row.c10.unwrap(), f32::MAX);
|
||||
// assert_eq!(row.c11, f64::MAX);
|
||||
assert_eq!(row.c12, "ABC");
|
||||
assert_eq!(row.c13, "涛思数据");
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
use anyhow::Result;
|
||||
use serde::Deserialize;
|
||||
use taos::*;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let taos = TaosBuilder::from_dsn("taos://")?.build()?;
|
||||
taos.exec_many([
|
||||
"drop database if exists test_bindable",
|
||||
"create database test_bindable keep 36500",
|
||||
"use test_bindable",
|
||||
"create table tb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint,
|
||||
c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned,
|
||||
c10 float, c11 double, c12 varchar(100), c13 nchar(100))",
|
||||
])
|
||||
.await?;
|
||||
let mut stmt = Stmt::init(&taos)?;
|
||||
stmt.prepare("insert into tb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")?;
|
||||
let params = vec![
|
||||
ColumnView::from_millis_timestamp(vec![0]),
|
||||
ColumnView::from_bools(vec![true]),
|
||||
ColumnView::from_tiny_ints(vec![i8::MAX]),
|
||||
ColumnView::from_small_ints(vec![i16::MAX]),
|
||||
ColumnView::from_ints(vec![i32::MAX]),
|
||||
ColumnView::from_big_ints(vec![i64::MAX]),
|
||||
ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
|
||||
ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
|
||||
ColumnView::from_unsigned_ints(vec![u32::MAX]),
|
||||
ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
|
||||
ColumnView::from_floats(vec![f32::MAX]),
|
||||
ColumnView::from_doubles(vec![f64::MAX]),
|
||||
ColumnView::from_varchar(vec!["ABC"]),
|
||||
ColumnView::from_nchar(vec!["涛思数据"]),
|
||||
];
|
||||
let rows = stmt.bind(¶ms)?.add_batch()?.execute()?;
|
||||
assert_eq!(rows, 1);
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[allow(dead_code)]
|
||||
struct Row {
|
||||
ts: String,
|
||||
c1: bool,
|
||||
c2: i8,
|
||||
c3: i16,
|
||||
c4: i32,
|
||||
c5: i64,
|
||||
c6: u8,
|
||||
c7: u16,
|
||||
c8: u32,
|
||||
c9: u64,
|
||||
c10: Option<f32>,
|
||||
c11: f64,
|
||||
c12: String,
|
||||
c13: String,
|
||||
}
|
||||
|
||||
let rows: Vec<Row> = taos
|
||||
.query("select * from tb1")
|
||||
.await?
|
||||
.deserialize()
|
||||
.try_collect()
|
||||
.await?;
|
||||
let row = &rows[0];
|
||||
dbg!(&row);
|
||||
assert_eq!(row.c5, i64::MAX);
|
||||
assert_eq!(row.c8, u32::MAX);
|
||||
assert_eq!(row.c9, u64::MAX);
|
||||
assert_eq!(row.c10.unwrap(), f32::MAX);
|
||||
// assert_eq!(row.c11, f64::MAX);
|
||||
assert_eq!(row.c12, "ABC");
|
||||
assert_eq!(row.c13, "涛思数据");
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use chrono::{DateTime, Local};
|
||||
use taos::*;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let dsn = "taos://";
|
||||
|
||||
let opts = PoolBuilder::new()
|
||||
.max_size(5000) // max connections
|
||||
.max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection
|
||||
.min_idle(Some(1000)) // minimal idle connections
|
||||
.connection_timeout(Duration::from_secs(2));
|
||||
|
||||
let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?;
|
||||
|
||||
let taos = pool.get()?;
|
||||
|
||||
let db = "query";
|
||||
|
||||
// prepare database
|
||||
taos.exec_many([
|
||||
format!("DROP DATABASE IF EXISTS `{db}`"),
|
||||
format!("CREATE DATABASE `{db}`"),
|
||||
format!("USE `{db}`"),
|
||||
])
|
||||
.await?;
|
||||
|
||||
let inserted = taos.exec_many([
|
||||
// create super table
|
||||
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))",
|
||||
// create child table
|
||||
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
|
||||
// insert into child table
|
||||
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
|
||||
// insert with NULL values
|
||||
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
|
||||
// insert and automatically create table with tags if not exists
|
||||
"INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)",
|
||||
// insert many records in a single sql
|
||||
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
|
||||
]).await?;
|
||||
|
||||
assert_eq!(inserted, 6);
|
||||
loop {
|
||||
let count: usize = taos
|
||||
.query_one("select count(*) from `meters`")
|
||||
.await?
|
||||
.unwrap_or_default();
|
||||
|
||||
if count >= 6 {
|
||||
break;
|
||||
} else {
|
||||
println!("waiting for data");
|
||||
}
|
||||
}
|
||||
|
||||
let mut result = taos.query("select tbname, * from `meters`").await?;
|
||||
|
||||
for field in result.fields() {
|
||||
println!("got field: {}", field.name());
|
||||
}
|
||||
|
||||
// Query option 1, use rows stream.
|
||||
let mut rows = result.rows();
|
||||
let mut nrows = 0;
|
||||
while let Some(row) = rows.try_next().await? {
|
||||
for (col, (name, value)) in row.enumerate() {
|
||||
println!(
|
||||
"[{}] got value in col {} (named `{:>8}`): {}",
|
||||
nrows, col, name, value
|
||||
);
|
||||
}
|
||||
nrows += 1;
|
||||
}
|
||||
|
||||
// Query options 2, use deserialization with serde.
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
#[allow(dead_code)]
|
||||
struct Record {
|
||||
tbname: String,
|
||||
// deserialize timestamp to chrono::DateTime<Local>
|
||||
ts: DateTime<Local>,
|
||||
// float to f32
|
||||
current: Option<f32>,
|
||||
// int to i32
|
||||
voltage: Option<i32>,
|
||||
phase: Option<f32>,
|
||||
groupid: i32,
|
||||
// binary/varchar to String
|
||||
location: String,
|
||||
}
|
||||
|
||||
let records: Vec<Record> = taos
|
||||
.query("select tbname, * from `meters`")
|
||||
.await?
|
||||
.deserialize()
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
||||
dbg!(result.summary());
|
||||
assert_eq!(records.len(), 6);
|
||||
dbg!(records);
|
||||
Ok(())
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use chrono::{DateTime, Local};
|
||||
use taos::*;
|
||||
|
||||
// Query options 2, use deserialization with serde.
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
#[allow(dead_code)]
|
||||
struct Record {
|
||||
// deserialize timestamp to chrono::DateTime<Local>
|
||||
ts: DateTime<Local>,
|
||||
// float to f32
|
||||
current: Option<f32>,
|
||||
// int to i32
|
||||
voltage: Option<i32>,
|
||||
phase: Option<f32>,
|
||||
}
|
||||
|
||||
async fn prepare(taos: Taos) -> anyhow::Result<()> {
|
||||
let inserted = taos.exec_many([
|
||||
// create child table
|
||||
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
|
||||
// insert into child table
|
||||
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
|
||||
// insert with NULL values
|
||||
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
|
||||
// insert and automatically create table with tags if not exists
|
||||
"INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)",
|
||||
// insert many records in a single sql
|
||||
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
|
||||
]).await?;
|
||||
assert_eq!(inserted, 6);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
// std::env::set_var("RUST_LOG", "debug");
|
||||
pretty_env_logger::init();
|
||||
let dsn = "taos://localhost:6030";
|
||||
let builder = TaosBuilder::from_dsn(dsn)?;
|
||||
|
||||
let taos = builder.build()?;
|
||||
let db = "tmq";
|
||||
|
||||
// prepare database
|
||||
taos.exec_many([
|
||||
"DROP TOPIC IF EXISTS tmq_meters".to_string(),
|
||||
format!("DROP DATABASE IF EXISTS `{db}`"),
|
||||
format!("CREATE DATABASE `{db}`"),
|
||||
format!("USE `{db}`"),
|
||||
// create super table
|
||||
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))".to_string(),
|
||||
// create topic for subscription
|
||||
format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
|
||||
])
|
||||
.await?;
|
||||
|
||||
let task = tokio::spawn(prepare(taos));
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
// subscribe
|
||||
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
|
||||
|
||||
let mut consumer = tmq.build()?;
|
||||
consumer.subscribe(["tmq_meters"]).await?;
|
||||
|
||||
{
|
||||
let mut stream = consumer.stream();
|
||||
|
||||
while let Some((offset, message)) = stream.try_next().await? {
|
||||
// get information from offset
|
||||
|
||||
// the topic
|
||||
let topic = offset.topic();
|
||||
// the vgroup id, like partition id in kafka.
|
||||
let vgroup_id = offset.vgroup_id();
|
||||
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
|
||||
|
||||
if let Some(data) = message.into_data() {
|
||||
while let Some(block) = data.fetch_raw_block().await? {
|
||||
// one block for one table, get table name if needed
|
||||
let name = block.table_name();
|
||||
let records: Vec<Record> = block.deserialize().try_collect()?;
|
||||
println!(
|
||||
"** table: {}, got {} records: {:#?}\n",
|
||||
name.unwrap(),
|
||||
records.len(),
|
||||
records
|
||||
);
|
||||
}
|
||||
}
|
||||
consumer.commit(offset).await?;
|
||||
}
|
||||
}
|
||||
|
||||
consumer.unsubscribe().await;
|
||||
|
||||
task.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
fn main() {
|
||||
println!("Hello, world!");
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
#include<taos.h>
|
Loading…
Reference in New Issue