Merge branch 'feature/query' of https://github.com/taosdata/TDengine into feature/query
This commit is contained in:
commit
d6bc3261c6
|
@ -724,6 +724,13 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
|
|||
return len;
|
||||
}
|
||||
|
||||
static void asyncCallback(void *param, TAOS_RES *tres, int code) {
|
||||
assert(param != NULL);
|
||||
SSqlObj *pSql = ((SSqlObj *)param);
|
||||
pSql->res.code = code;
|
||||
sem_post(&pSql->rspSem);
|
||||
}
|
||||
|
||||
int taos_validate_sql(TAOS *taos, const char *sql) {
|
||||
STscObj *pObj = (STscObj *)taos;
|
||||
if (pObj == NULL || pObj->signature != pObj) {
|
||||
|
@ -732,7 +739,8 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
|
|||
}
|
||||
|
||||
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
|
||||
|
||||
pSql->pTscObj = taos;
|
||||
pSql->signature = pSql;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
|
@ -766,10 +774,17 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
|
|||
pCmd->pTableList = NULL;
|
||||
}
|
||||
|
||||
pRes->code = (uint8_t)tsParseSql(pSql, false);
|
||||
int code = pRes->code;
|
||||
|
||||
tscDebug("%p Valid SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
|
||||
pSql->fp = asyncCallback;
|
||||
pSql->fetchFp = asyncCallback;
|
||||
pSql->param = pSql;
|
||||
int code = tsParseSql(pSql, true);
|
||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
sem_wait(&pSql->rspSem);
|
||||
code = pSql->res.code;
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscDebug("%p Valid SQL result:%d, %s pObj:%p", pSql, code, taos_errstr(taos), pObj);
|
||||
}
|
||||
taos_free_result(pSql);
|
||||
|
||||
return code;
|
||||
|
@ -865,6 +880,8 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
|||
}
|
||||
|
||||
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
|
||||
pSql->pTscObj = taos;
|
||||
pSql->signature = pSql;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
|
||||
pRes->numOfTotal = 0; // the number of getting table meta from server
|
||||
|
|
|
@ -122,7 +122,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
|
|||
pQueryInfo->window.ekey = pStream->etime;
|
||||
}
|
||||
} else {
|
||||
pQueryInfo->window.skey = pStream->stime - pStream->interval;
|
||||
pQueryInfo->window.skey = pStream->stime;
|
||||
int64_t etime = taosGetTimestamp(pStream->precision);
|
||||
// delay to wait all data in last time window
|
||||
if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
|
@ -232,6 +232,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
|
|||
(*pStream->fp)(pStream->param, res, row);
|
||||
}
|
||||
|
||||
if (!pStream->isProject) {
|
||||
pStream->stime += pStream->slidingTime;
|
||||
}
|
||||
// actually only one row is returned. this following is not necessary
|
||||
taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream);
|
||||
} else { // numOfRows == 0, all data has been retrieved
|
||||
|
@ -432,6 +435,7 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
|
|||
} else { // timewindow based aggregation stream
|
||||
if (stime == 0) { // no data in meter till now
|
||||
stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval;
|
||||
stime -= pStream->interval;
|
||||
tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime);
|
||||
} else {
|
||||
int64_t newStime = (stime / pStream->interval) * pStream->interval;
|
||||
|
|
|
@ -34,6 +34,7 @@ typedef struct SSubscriptionProgress {
|
|||
typedef struct SSub {
|
||||
void * signature;
|
||||
char topic[32];
|
||||
sem_t sem;
|
||||
int64_t lastSyncTime;
|
||||
int64_t lastConsumeTime;
|
||||
TAOS * taos;
|
||||
|
@ -83,84 +84,108 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) {
|
|||
|
||||
static void asyncCallback(void *param, TAOS_RES *tres, int code) {
|
||||
assert(param != NULL);
|
||||
SSqlObj *pSql = ((SSqlObj *)param);
|
||||
|
||||
pSql->res.code = code;
|
||||
sem_post(&pSql->rspSem);
|
||||
SSub *pSub = ((SSub *)param);
|
||||
pSub->pSql->res.code = code;
|
||||
sem_post(&pSub->sem);
|
||||
}
|
||||
|
||||
|
||||
static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* sql) {
|
||||
SSub* pSub = NULL;
|
||||
int code = TSDB_CODE_SUCCESS, line = __LINE__;
|
||||
SSqlObj* pSql = NULL;
|
||||
|
||||
TRY( 8 ) {
|
||||
SSqlObj* pSql = calloc_throw(1, sizeof(SSqlObj));
|
||||
CLEANUP_PUSH_FREE(true, pSql);
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
SSub* pSub = calloc(1, sizeof(SSub));
|
||||
if (pSub == NULL) {
|
||||
line = __LINE__;
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto fail;
|
||||
}
|
||||
pSub->signature = pSub;
|
||||
if (tsem_init(&pSub->sem, 0, 0) == -1) {
|
||||
line = __LINE__;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto fail;
|
||||
}
|
||||
tstrncpy(pSub->topic, topic, sizeof(pSub->topic));
|
||||
pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress));
|
||||
if (pSub->progress == NULL) {
|
||||
line = __LINE__;
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (tsem_init(&pSql->rspSem, 0, 0) == -1) {
|
||||
THROW(TAOS_SYSTEM_ERROR(errno));
|
||||
}
|
||||
CLEANUP_PUSH_INT_PTR(true, tsem_destroy, &pSql->rspSem);
|
||||
pSql = calloc(1, sizeof(SSqlObj));
|
||||
if (pSql == NULL) {
|
||||
line = __LINE__;
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto fail;
|
||||
}
|
||||
pSql->signature = pSql;
|
||||
pSql->pTscObj = pObj;
|
||||
pSql->pSubscription = pSub;
|
||||
pSub->pSql = pSql;
|
||||
|
||||
pSql->signature = pSql;
|
||||
pSql->param = pSql;
|
||||
pSql->pTscObj = pObj;
|
||||
pSql->maxRetry = TSDB_MAX_REPLICA;
|
||||
pSql->fp = asyncCallback;
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
SSqlRes* pRes = &pSql->res;
|
||||
if (tsem_init(&pSql->rspSem, 0, 0) == -1) {
|
||||
line = __LINE__;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto fail;
|
||||
}
|
||||
|
||||
int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
THROW(code);
|
||||
}
|
||||
CLEANUP_PUSH_FREE(true, pCmd->payload);
|
||||
pSql->param = pSub;
|
||||
pSql->maxRetry = TSDB_MAX_REPLICA;
|
||||
pSql->fp = asyncCallback;
|
||||
pSql->fetchFp = asyncCallback;
|
||||
pSql->sqlstr = strdup(sql);
|
||||
if (pSql->sqlstr == NULL) {
|
||||
line = __LINE__;
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto fail;
|
||||
}
|
||||
strtolower(pSql->sqlstr, pSql->sqlstr);
|
||||
pRes->qhandle = 0;
|
||||
pRes->numOfRows = 1;
|
||||
|
||||
pRes->qhandle = 0;
|
||||
pRes->numOfRows = 1;
|
||||
code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
line = __LINE__;
|
||||
goto fail;
|
||||
}
|
||||
|
||||
pSql->sqlstr = strdup_throw(sql);
|
||||
CLEANUP_PUSH_FREE(true, pSql->sqlstr);
|
||||
strtolower(pSql->sqlstr, pSql->sqlstr);
|
||||
code = tsParseSql(pSql, false);
|
||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
sem_wait(&pSub->sem);
|
||||
code = pSql->res.code;
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
line = __LINE__;
|
||||
goto fail;
|
||||
}
|
||||
|
||||
code = tsParseSql(pSql, false);
|
||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
// wait for the callback function to post the semaphore
|
||||
sem_wait(&pSql->rspSem);
|
||||
code = pSql->res.code;
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscError("failed to parse sql statement: %s, error: %s", pSub->topic, tstrerror(code));
|
||||
THROW( code );
|
||||
}
|
||||
|
||||
if (pSql->cmd.command != TSDB_SQL_SELECT) {
|
||||
tscError("only 'select' statement is allowed in subscription: %s", pSub->topic);
|
||||
THROW( -1 ); // TODO
|
||||
}
|
||||
|
||||
pSub = calloc_throw(1, sizeof(SSub));
|
||||
CLEANUP_PUSH_FREE(true, pSub);
|
||||
pSql->pSubscription = pSub;
|
||||
pSub->pSql = pSql;
|
||||
pSub->signature = pSub;
|
||||
strncpy(pSub->topic, topic, sizeof(pSub->topic));
|
||||
pSub->topic[sizeof(pSub->topic) - 1] = 0;
|
||||
pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress));
|
||||
if (pSub->progress == NULL) {
|
||||
THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
CLEANUP_EXECUTE();
|
||||
|
||||
} CATCH( code ) {
|
||||
tscError("failed to create subscription object: %s", tstrerror(code));
|
||||
CLEANUP_EXECUTE();
|
||||
pSub = NULL;
|
||||
|
||||
} END_TRY
|
||||
if (pSql->cmd.command != TSDB_SQL_SELECT) {
|
||||
line = __LINE__;
|
||||
code = TSDB_CODE_TSC_INVALID_SQL;
|
||||
goto fail;
|
||||
}
|
||||
|
||||
return pSub;
|
||||
|
||||
fail:
|
||||
tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code));
|
||||
if (pSql != NULL) {
|
||||
tscFreeSqlObj(pSql);
|
||||
pSql = NULL;
|
||||
}
|
||||
if (pSub != NULL) {
|
||||
taosArrayDestroy(pSub->progress);
|
||||
tsem_destroy(&pSub->sem);
|
||||
free(pSub);
|
||||
pSub = NULL;
|
||||
}
|
||||
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
|
@ -405,9 +430,10 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
|
|||
tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0;
|
||||
|
||||
pSql->fp = asyncCallback;
|
||||
pSql->param = pSql;
|
||||
pSql->fetchFp = asyncCallback;
|
||||
pSql->param = pSub;
|
||||
tscDoQuery(pSql);
|
||||
sem_wait(&pSql->rspSem);
|
||||
sem_wait(&pSub->sem);
|
||||
|
||||
if (pRes->code != TSDB_CODE_SUCCESS) {
|
||||
continue;
|
||||
|
@ -437,7 +463,9 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
|
|||
}
|
||||
|
||||
if (keepProgress) {
|
||||
tscSaveSubscriptionProgress(pSub);
|
||||
if (pSub->progress != NULL) {
|
||||
tscSaveSubscriptionProgress(pSub);
|
||||
}
|
||||
} else {
|
||||
char path[256];
|
||||
sprintf(path, "%s/subscribe/%s", tsDataDir, pSub->topic);
|
||||
|
@ -448,6 +476,7 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
|
|||
|
||||
tscFreeSqlObj(pSub->pSql);
|
||||
taosArrayDestroy(pSub->progress);
|
||||
tsem_destroy(&pSub->sem);
|
||||
memset(pSub, 0, sizeof(*pSub));
|
||||
free(pSub);
|
||||
}
|
||||
|
|
|
@ -67,8 +67,6 @@ DLL_EXPORT void taos_init();
|
|||
DLL_EXPORT void taos_cleanup();
|
||||
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
|
||||
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
|
||||
DLL_EXPORT TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t userLen,
|
||||
const char *pass, uint8_t passLen, const char *db, uint8_t dbLen, uint16_t port);
|
||||
DLL_EXPORT void taos_close(TAOS *taos);
|
||||
|
||||
typedef struct TAOS_BIND {
|
||||
|
@ -90,7 +88,6 @@ TAOS_RES * taos_stmt_use_result(TAOS_STMT *stmt);
|
|||
int taos_stmt_close(TAOS_STMT *stmt);
|
||||
|
||||
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
|
||||
DLL_EXPORT TAOS_RES *taos_query_c(TAOS *taos, const char *sql, uint32_t sqlLen);
|
||||
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
|
||||
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
|
||||
DLL_EXPORT void taos_free_result(TAOS_RES *res);
|
||||
|
|
|
@ -4964,7 +4964,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
|
|||
pQuery->current->lastKey, pQuery->window.ekey);
|
||||
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||
STableIdInfo tidInfo;
|
||||
STableId* id = TSDB_TABLEID(pQuery->current);
|
||||
STableId* id = TSDB_TABLEID(pQuery->current->pTable);
|
||||
|
||||
tidInfo.uid = id->uid;
|
||||
tidInfo.tid = id->tid;
|
||||
|
|
|
@ -0,0 +1,474 @@
|
|||
// sample code to verify all TDengine API
|
||||
// to compile: gcc -o apitest apitest.c -ltaos
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <taos.h>
|
||||
#include <unistd.h>
|
||||
|
||||
|
||||
static void prepare_data(TAOS* taos) {
|
||||
taos_query(taos, "drop database if exists test;");
|
||||
usleep(100000);
|
||||
taos_query(taos, "create database test;");
|
||||
usleep(100000);
|
||||
taos_select_db(taos, "test");
|
||||
|
||||
taos_query(taos, "create table meters(ts timestamp, a int) tags(area int);");
|
||||
|
||||
taos_query(taos, "create table t0 using meters tags(0);");
|
||||
taos_query(taos, "create table t1 using meters tags(1);");
|
||||
taos_query(taos, "create table t2 using meters tags(2);");
|
||||
taos_query(taos, "create table t3 using meters tags(3);");
|
||||
taos_query(taos, "create table t4 using meters tags(4);");
|
||||
taos_query(taos, "create table t5 using meters tags(5);");
|
||||
taos_query(taos, "create table t6 using meters tags(6);");
|
||||
taos_query(taos, "create table t7 using meters tags(7);");
|
||||
taos_query(taos, "create table t8 using meters tags(8);");
|
||||
taos_query(taos, "create table t9 using meters tags(9);");
|
||||
|
||||
TAOS_RES* res = taos_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0)"
|
||||
" ('2020-01-01 00:01:00.000', 0)"
|
||||
" ('2020-01-01 00:02:00.000', 0)"
|
||||
" t1 values('2020-01-01 00:00:00.000', 0)"
|
||||
" ('2020-01-01 00:01:00.000', 0)"
|
||||
" ('2020-01-01 00:02:00.000', 0)"
|
||||
" ('2020-01-01 00:03:00.000', 0)"
|
||||
" t2 values('2020-01-01 00:00:00.000', 0)"
|
||||
" ('2020-01-01 00:01:00.000', 0)"
|
||||
" ('2020-01-01 00:01:01.000', 0)"
|
||||
" ('2020-01-01 00:01:02.000', 0)"
|
||||
" t3 values('2020-01-01 00:01:02.000', 0)"
|
||||
" t4 values('2020-01-01 00:01:02.000', 0)"
|
||||
" t5 values('2020-01-01 00:01:02.000', 0)"
|
||||
" t6 values('2020-01-01 00:01:02.000', 0)"
|
||||
" t7 values('2020-01-01 00:01:02.000', 0)"
|
||||
" t8 values('2020-01-01 00:01:02.000', 0)"
|
||||
" t9 values('2020-01-01 00:01:02.000', 0)");
|
||||
int affected = taos_affected_rows(res);
|
||||
if (affected != 18) {
|
||||
printf("\033[31m%d rows affected by last insert statement, but it should be 18\033[0m\n", affected);
|
||||
}
|
||||
// super tables subscription
|
||||
usleep(1000000);
|
||||
}
|
||||
|
||||
|
||||
static int print_result(TAOS_RES* res, int blockFetch) {
|
||||
TAOS_ROW row = NULL;
|
||||
int num_fields = taos_num_fields(res);
|
||||
TAOS_FIELD* fields = taos_fetch_fields(res);
|
||||
int nRows = 0;
|
||||
|
||||
if (blockFetch) {
|
||||
int rows = 0;
|
||||
while ((rows = taos_fetch_block(res, &row))) {
|
||||
for (int i = 0; i < rows; i++) {
|
||||
char temp[256];
|
||||
taos_print_row(temp, row + i, fields, num_fields);
|
||||
puts(temp);
|
||||
}
|
||||
nRows += rows;
|
||||
}
|
||||
} else {
|
||||
while ((row = taos_fetch_row(res))) {
|
||||
char temp[256];
|
||||
taos_print_row(temp, row, fields, num_fields);
|
||||
puts(temp);
|
||||
nRows++;
|
||||
}
|
||||
}
|
||||
|
||||
printf("%d rows consumed.\n", nRows);
|
||||
return nRows;
|
||||
}
|
||||
|
||||
|
||||
static void check_row_count(int line, TAOS_RES* res, int expected) {
|
||||
int actual = print_result(res, expected % 2);
|
||||
if (actual != expected) {
|
||||
printf("\033[31mline %d: row count mismatch, expected: %d, actual: %d\033[0m\n", line, expected, actual);
|
||||
} else {
|
||||
printf("line %d: %d rows consumed as expected\n", line, actual);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void verify_query(TAOS* taos) {
|
||||
prepare_data(taos);
|
||||
|
||||
int code = taos_load_table_info(taos, "t0,t1,t2,t3,t4,t5,t6,t7,t8,t9");
|
||||
if (code != 0) {
|
||||
printf("\033[31mfailed to load table info: 0x%08x\033[0m\n", code);
|
||||
}
|
||||
|
||||
code = taos_validate_sql(taos, "select * from nonexisttable");
|
||||
if (code == 0) {
|
||||
printf("\033[31mimpossible, the table does not exists\033[0m\n");
|
||||
}
|
||||
|
||||
code = taos_validate_sql(taos, "select * from meters");
|
||||
if (code != 0) {
|
||||
printf("\033[31mimpossible, the table does exists: 0x%08x\033[0m\n", code);
|
||||
}
|
||||
|
||||
TAOS_RES* res = taos_query(taos, "select * from meters");
|
||||
check_row_count(__LINE__, res, 18);
|
||||
printf("result precision is: %d\n", taos_result_precision(res));
|
||||
int c = taos_field_count(res);
|
||||
printf("field count is: %d\n", c);
|
||||
int* lengths = taos_fetch_lengths(res);
|
||||
for (int i = 0; i < c; i++) {
|
||||
printf("length of column %d is %d\n", i, lengths[i]);
|
||||
}
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query(taos, "select * from t0");
|
||||
check_row_count(__LINE__, res, 3);
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query(taos, "select * from nonexisttable");
|
||||
code = taos_errno(res);
|
||||
printf("code=%d, error msg=%s\n", code, taos_errstr(res));
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query(taos, "select * from meters");
|
||||
taos_stop_query(res);
|
||||
}
|
||||
|
||||
|
||||
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
|
||||
int rows = print_result(res, *(int*)param);
|
||||
printf("%d rows consumed in subscribe_callback\n", rows);
|
||||
}
|
||||
|
||||
static void verify_subscribe(TAOS* taos) {
|
||||
prepare_data(taos);
|
||||
|
||||
TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
|
||||
TAOS_RES* res = taos_consume(tsub);
|
||||
check_row_count(__LINE__, res, 18);
|
||||
|
||||
res = taos_consume(tsub);
|
||||
check_row_count(__LINE__, res, 0);
|
||||
|
||||
taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);");
|
||||
taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);");
|
||||
res = taos_consume(tsub);
|
||||
check_row_count(__LINE__, res, 2);
|
||||
|
||||
taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);");
|
||||
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);");
|
||||
res = taos_consume(tsub);
|
||||
check_row_count(__LINE__, res, 2);
|
||||
|
||||
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);");
|
||||
res = taos_consume(tsub);
|
||||
check_row_count(__LINE__, res, 1);
|
||||
|
||||
// keep progress information and restart subscription
|
||||
taos_unsubscribe(tsub, 1);
|
||||
taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);");
|
||||
tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
|
||||
res = taos_consume(tsub);
|
||||
check_row_count(__LINE__, res, 24);
|
||||
|
||||
// keep progress information and continue previous subscription
|
||||
taos_unsubscribe(tsub, 1);
|
||||
tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
|
||||
res = taos_consume(tsub);
|
||||
check_row_count(__LINE__, res, 0);
|
||||
|
||||
// don't keep progress information and continue previous subscription
|
||||
taos_unsubscribe(tsub, 0);
|
||||
tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
|
||||
res = taos_consume(tsub);
|
||||
check_row_count(__LINE__, res, 24);
|
||||
|
||||
// single meter subscription
|
||||
|
||||
taos_unsubscribe(tsub, 0);
|
||||
tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
|
||||
res = taos_consume(tsub);
|
||||
check_row_count(__LINE__, res, 5);
|
||||
|
||||
res = taos_consume(tsub);
|
||||
check_row_count(__LINE__, res, 0);
|
||||
|
||||
taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);");
|
||||
res = taos_consume(tsub);
|
||||
check_row_count(__LINE__, res, 1);
|
||||
|
||||
taos_unsubscribe(tsub, 0);
|
||||
|
||||
int blockFetch = 0;
|
||||
tsub = taos_subscribe(taos, 1, "test", "select * from meters;", subscribe_callback, &blockFetch, 1000);
|
||||
usleep(2000000);
|
||||
taos_query(taos, "insert into t0 values('2020-01-01 00:05:00.001', 0);");
|
||||
usleep(2000000);
|
||||
taos_unsubscribe(tsub, 0);
|
||||
}
|
||||
|
||||
|
||||
void verify_prepare(TAOS* taos) {
|
||||
TAOS_RES* result = taos_query(taos, "drop database if exists test;");
|
||||
usleep(100000);
|
||||
taos_query(taos, "create database test;");
|
||||
|
||||
int code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("\033[31mfailed to create database, reason:%s\033[0m\n", taos_errstr(result));
|
||||
taos_free_result(result);
|
||||
return;
|
||||
}
|
||||
taos_free_result(result);
|
||||
|
||||
usleep(100000);
|
||||
taos_select_db(taos, "test");
|
||||
|
||||
// create table
|
||||
const char* sql = "create table m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin binary(40), blob nchar(10))";
|
||||
result = taos_query(taos, sql);
|
||||
code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("\033[31mfailed to create table, reason:%s\033[0m\n", taos_errstr(result));
|
||||
taos_free_result(result);
|
||||
return;
|
||||
}
|
||||
taos_free_result(result);
|
||||
|
||||
// insert 10 records
|
||||
struct {
|
||||
int64_t ts;
|
||||
int8_t b;
|
||||
int8_t v1;
|
||||
int16_t v2;
|
||||
int32_t v4;
|
||||
int64_t v8;
|
||||
float f4;
|
||||
double f8;
|
||||
char bin[40];
|
||||
char blob[80];
|
||||
} v = {0};
|
||||
|
||||
TAOS_STMT* stmt = taos_stmt_init(taos);
|
||||
TAOS_BIND params[10];
|
||||
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
params[0].buffer_length = sizeof(v.ts);
|
||||
params[0].buffer = &v.ts;
|
||||
params[0].length = ¶ms[0].buffer_length;
|
||||
params[0].is_null = NULL;
|
||||
|
||||
params[1].buffer_type = TSDB_DATA_TYPE_BOOL;
|
||||
params[1].buffer_length = sizeof(v.b);
|
||||
params[1].buffer = &v.b;
|
||||
params[1].length = ¶ms[1].buffer_length;
|
||||
params[1].is_null = NULL;
|
||||
|
||||
params[2].buffer_type = TSDB_DATA_TYPE_TINYINT;
|
||||
params[2].buffer_length = sizeof(v.v1);
|
||||
params[2].buffer = &v.v1;
|
||||
params[2].length = ¶ms[2].buffer_length;
|
||||
params[2].is_null = NULL;
|
||||
|
||||
params[3].buffer_type = TSDB_DATA_TYPE_SMALLINT;
|
||||
params[3].buffer_length = sizeof(v.v2);
|
||||
params[3].buffer = &v.v2;
|
||||
params[3].length = ¶ms[3].buffer_length;
|
||||
params[3].is_null = NULL;
|
||||
|
||||
params[4].buffer_type = TSDB_DATA_TYPE_INT;
|
||||
params[4].buffer_length = sizeof(v.v4);
|
||||
params[4].buffer = &v.v4;
|
||||
params[4].length = ¶ms[4].buffer_length;
|
||||
params[4].is_null = NULL;
|
||||
|
||||
params[5].buffer_type = TSDB_DATA_TYPE_BIGINT;
|
||||
params[5].buffer_length = sizeof(v.v8);
|
||||
params[5].buffer = &v.v8;
|
||||
params[5].length = ¶ms[5].buffer_length;
|
||||
params[5].is_null = NULL;
|
||||
|
||||
params[6].buffer_type = TSDB_DATA_TYPE_FLOAT;
|
||||
params[6].buffer_length = sizeof(v.f4);
|
||||
params[6].buffer = &v.f4;
|
||||
params[6].length = ¶ms[6].buffer_length;
|
||||
params[6].is_null = NULL;
|
||||
|
||||
params[7].buffer_type = TSDB_DATA_TYPE_DOUBLE;
|
||||
params[7].buffer_length = sizeof(v.f8);
|
||||
params[7].buffer = &v.f8;
|
||||
params[7].length = ¶ms[7].buffer_length;
|
||||
params[7].is_null = NULL;
|
||||
|
||||
params[8].buffer_type = TSDB_DATA_TYPE_BINARY;
|
||||
params[8].buffer_length = sizeof(v.bin);
|
||||
params[8].buffer = v.bin;
|
||||
params[8].length = ¶ms[8].buffer_length;
|
||||
params[8].is_null = NULL;
|
||||
|
||||
strcpy(v.blob, "一二三四五六七八九十");
|
||||
params[9].buffer_type = TSDB_DATA_TYPE_NCHAR;
|
||||
params[9].buffer_length = strlen(v.blob);
|
||||
params[9].buffer = v.blob;
|
||||
params[9].length = ¶ms[9].buffer_length;
|
||||
params[9].is_null = NULL;
|
||||
|
||||
int is_null = 1;
|
||||
|
||||
sql = "insert into m1 values(?,?,?,?,?,?,?,?,?,?)";
|
||||
code = taos_stmt_prepare(stmt, sql, 0);
|
||||
if (code != 0){
|
||||
printf("\033[31mfailed to execute taos_stmt_prepare. code:0x%x\033[0m\n", code);
|
||||
}
|
||||
v.ts = 1591060628000;
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
v.ts += 1;
|
||||
for (int j = 1; j < 10; ++j) {
|
||||
params[j].is_null = ((i == j) ? &is_null : 0);
|
||||
}
|
||||
v.b = (int8_t)i % 2;
|
||||
v.v1 = (int8_t)i;
|
||||
v.v2 = (int16_t)(i * 2);
|
||||
v.v4 = (int32_t)(i * 4);
|
||||
v.v8 = (int64_t)(i * 8);
|
||||
v.f4 = (float)(i * 40);
|
||||
v.f8 = (double)(i * 80);
|
||||
for (int j = 0; j < sizeof(v.bin) - 1; ++j) {
|
||||
v.bin[j] = (char)(i + '0');
|
||||
}
|
||||
|
||||
taos_stmt_bind_param(stmt, params);
|
||||
taos_stmt_add_batch(stmt);
|
||||
}
|
||||
if (taos_stmt_execute(stmt) != 0) {
|
||||
printf("\033[31mfailed to execute insert statement.\033[0m\n");
|
||||
return;
|
||||
}
|
||||
taos_stmt_close(stmt);
|
||||
|
||||
// query the records
|
||||
stmt = taos_stmt_init(taos);
|
||||
taos_stmt_prepare(stmt, "SELECT * FROM m1 WHERE v1 > ? AND v2 < ?", 0);
|
||||
v.v1 = 5;
|
||||
v.v2 = 15;
|
||||
taos_stmt_bind_param(stmt, params + 2);
|
||||
if (taos_stmt_execute(stmt) != 0) {
|
||||
printf("\033[31mfailed to execute select statement.\033[0m\n");
|
||||
return;
|
||||
}
|
||||
|
||||
result = taos_stmt_use_result(stmt);
|
||||
|
||||
TAOS_ROW row;
|
||||
int rows = 0;
|
||||
int num_fields = taos_num_fields(result);
|
||||
TAOS_FIELD *fields = taos_fetch_fields(result);
|
||||
char temp[256];
|
||||
|
||||
// fetch the records row by row
|
||||
while ((row = taos_fetch_row(result))) {
|
||||
rows++;
|
||||
taos_print_row(temp, row, fields, num_fields);
|
||||
printf("%s\n", temp);
|
||||
}
|
||||
|
||||
taos_free_result(result);
|
||||
taos_stmt_close(stmt);
|
||||
}
|
||||
|
||||
void retrieve_callback(void *param, TAOS_RES *tres, int numOfRows)
|
||||
{
|
||||
if (numOfRows > 0) {
|
||||
printf("%d rows async retrieved\n", numOfRows);
|
||||
taos_fetch_rows_a(tres, retrieve_callback, param);
|
||||
} else {
|
||||
if (numOfRows < 0) {
|
||||
printf("\033[31masync retrieve failed, code: %d\033[0m\n", numOfRows);
|
||||
} else {
|
||||
printf("async retrieve completed\n");
|
||||
}
|
||||
taos_free_result(tres);
|
||||
}
|
||||
}
|
||||
|
||||
void select_callback(void *param, TAOS_RES *tres, int code)
|
||||
{
|
||||
if (code == 0 && tres) {
|
||||
taos_fetch_rows_a(tres, retrieve_callback, param);
|
||||
} else {
|
||||
printf("\033[31masync select failed, code: %d\033[0m\n", code);
|
||||
}
|
||||
}
|
||||
|
||||
void verify_async(TAOS* taos) {
|
||||
prepare_data(taos);
|
||||
taos_query_a(taos, "select * from meters", select_callback, NULL);
|
||||
usleep(1000000);
|
||||
}
|
||||
|
||||
void stream_callback(void *param, TAOS_RES *res, TAOS_ROW row) {
|
||||
int num_fields = taos_num_fields(res);
|
||||
TAOS_FIELD* fields = taos_fetch_fields(res);
|
||||
|
||||
printf("got one row from stream_callback\n");
|
||||
char temp[256];
|
||||
taos_print_row(temp, row, fields, num_fields);
|
||||
puts(temp);
|
||||
}
|
||||
|
||||
void verify_stream(TAOS* taos) {
|
||||
prepare_data(taos);
|
||||
TAOS_STREAM* strm = taos_open_stream(
|
||||
taos,
|
||||
"select count(*) from meters interval(1m)",
|
||||
stream_callback,
|
||||
0,
|
||||
NULL,
|
||||
NULL);
|
||||
printf("waiting for stream data\n");
|
||||
usleep(100000);
|
||||
taos_query(taos, "insert into t0 values(now, 0)(now+5s,1)(now+10s, 2);");
|
||||
usleep(200000000);
|
||||
taos_close_stream(strm);
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
const char* host = "127.0.0.1";
|
||||
const char* user = "root";
|
||||
const char* passwd = "taosdata";
|
||||
|
||||
taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
|
||||
taos_init();
|
||||
|
||||
TAOS* taos = taos_connect(host, user, passwd, "", 0);
|
||||
if (taos == NULL) {
|
||||
printf("\033[31mfailed to connect to db, reason:%s\033[0m\n", taos_errstr(taos));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
char* info = taos_get_server_info(taos);
|
||||
printf("server info: %s\n", info);
|
||||
info = taos_get_client_info(taos);
|
||||
printf("client info: %s\n", info);
|
||||
|
||||
printf("************ verify query *************\n");
|
||||
verify_query(taos);
|
||||
|
||||
printf("********* verify async query **********\n");
|
||||
verify_async(taos);
|
||||
|
||||
printf("*********** verify subscribe ************\n");
|
||||
verify_subscribe(taos);
|
||||
|
||||
printf("************ verify prepare *************\n");
|
||||
verify_prepare(taos);
|
||||
|
||||
printf("************ verify stream *************\n");
|
||||
verify_stream(taos);
|
||||
printf("done\n");
|
||||
|
||||
taos_close(taos);
|
||||
taos_cleanup();
|
||||
}
|
|
@ -4,7 +4,6 @@
|
|||
ROOT=./
|
||||
TARGET=exe
|
||||
LFLAGS = '-Wl,-rpath,/usr/local/taos/driver/' -ltaos -lpthread -lm -lrt
|
||||
#LFLAGS = '-Wl,-rpath,/home/zbm/project/td/debug/build/lib/' -L/home/zbm/project/td/debug/build/lib -ltaos -lpthread -lm -lrt
|
||||
CFLAGS = -O3 -g -Wall -Wno-deprecated -fPIC -Wno-unused-result -Wconversion -Wno-char-subscripts -D_REENTRANT -Wno-format -D_REENTRANT -DLINUX -msse4.2 -Wno-unused-function -D_M_X64 \
|
||||
-I/usr/local/taos/include -std=gnu99
|
||||
|
||||
|
@ -16,6 +15,7 @@ exe:
|
|||
gcc $(CFLAGS) ./prepare.c -o $(ROOT)/prepare $(LFLAGS)
|
||||
gcc $(CFLAGS) ./stream.c -o $(ROOT)/stream $(LFLAGS)
|
||||
gcc $(CFLAGS) ./subscribe.c -o $(ROOT)subscribe $(LFLAGS)
|
||||
gcc $(CFLAGS) ./apitest.c -o $(ROOT)apitest $(LFLAGS)
|
||||
|
||||
clean:
|
||||
rm $(ROOT)/asyncdemo
|
||||
|
|
Loading…
Reference in New Issue