Merge branch 'docs/sheyj-3.0' of github.com:taosdata/TDengine into docs/sheyj-3.0
This commit is contained in:
commit
0017c92d84
|
@ -22,55 +22,57 @@
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
|
|
||||||
|
|
||||||
static int DemoCreateDB() {
|
static int DemoCreateDB() {
|
||||||
// ANCHOR: create_db_and_table
|
// ANCHOR: create_db_and_table
|
||||||
const char *host = "localhost";
|
const char *host = "localhost";
|
||||||
const char *user = "root";
|
const char *user = "root";
|
||||||
const char *password = "taosdata";
|
const char *password = "taosdata";
|
||||||
uint16_t port = 6030;
|
uint16_t port = 6030;
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
|
||||||
// connect
|
// connect
|
||||||
TAOS *taos = taos_connect(host, user, password, NULL, port);
|
TAOS *taos = taos_connect(host, user, password, NULL, port);
|
||||||
if (taos == NULL) {
|
if (taos == NULL) {
|
||||||
printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
|
printf("Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
|
||||||
taos_cleanup();
|
taos_errstr(NULL));
|
||||||
return -1;
|
taos_cleanup();
|
||||||
}
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
// create database
|
// create database
|
||||||
TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power");
|
TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power");
|
||||||
code = taos_errno(result);
|
code = taos_errno(result);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
printf("Failed to create database power, Server: %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, code, taos_errstr(result));
|
printf("Failed to create database power, Server: %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, code,
|
||||||
|
taos_errstr(result));
|
||||||
|
taos_close(taos);
|
||||||
|
taos_cleanup();
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(result);
|
||||||
|
printf("Create database power successfully.\n");
|
||||||
|
|
||||||
|
// create table
|
||||||
|
const char *sql =
|
||||||
|
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId "
|
||||||
|
"INT, location BINARY(24))";
|
||||||
|
result = taos_query(taos, sql);
|
||||||
|
code = taos_errno(result);
|
||||||
|
if (code != 0) {
|
||||||
|
printf("Failed to create stable power.meters, Server: %s:%hu, ErrCode: 0x%x, ErrMessage: %s\n.", host, port, code,
|
||||||
|
taos_errstr(result));
|
||||||
|
taos_close(taos);
|
||||||
|
taos_cleanup();
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(result);
|
||||||
|
printf("Create stable power.meters successfully.\n");
|
||||||
|
|
||||||
|
// close & clean
|
||||||
taos_close(taos);
|
taos_close(taos);
|
||||||
taos_cleanup();
|
taos_cleanup();
|
||||||
return -1;
|
return 0;
|
||||||
}
|
// ANCHOR_END: create_db_and_table
|
||||||
taos_free_result(result);
|
|
||||||
printf("Create database power successfully.\n");
|
|
||||||
|
|
||||||
// create table
|
|
||||||
const char* sql = "CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))";
|
|
||||||
result = taos_query(taos, sql);
|
|
||||||
code = taos_errno(result);
|
|
||||||
if (code != 0) {
|
|
||||||
printf("Failed to create stable power.meters, Server: %s:%hu, ErrCode: 0x%x, ErrMessage: %s\n.", host, port, code, taos_errstr(result));
|
|
||||||
taos_close(taos);
|
|
||||||
taos_cleanup();
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taos_free_result(result);
|
|
||||||
printf("Create stable meters successfully.\n");
|
|
||||||
|
|
||||||
// close & clean
|
|
||||||
taos_close(taos);
|
|
||||||
taos_cleanup();
|
|
||||||
return 0;
|
|
||||||
// ANCHOR_END: create_db_and_table
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) { return DemoCreateDB(); }
|
||||||
return DemoCreateDB();
|
|
||||||
}
|
|
||||||
|
|
|
@ -28,6 +28,18 @@ volatile int thread_stop = 0;
|
||||||
static int running = 1;
|
static int running = 1;
|
||||||
const char* topic_name = "topic_meters";
|
const char* topic_name = "topic_meters";
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
const char* enable_auto_commit;
|
||||||
|
const char* auto_commit_interval_ms;
|
||||||
|
const char* group_id;
|
||||||
|
const char* client_id;
|
||||||
|
const char* td_connect_host;
|
||||||
|
const char* td_connect_port;
|
||||||
|
const char* td_connect_user;
|
||||||
|
const char* td_connect_pass;
|
||||||
|
const char* auto_offset_reset;
|
||||||
|
} ConsumerConfig;
|
||||||
|
|
||||||
void* prepare_data(void* arg) {
|
void* prepare_data(void* arg) {
|
||||||
const char *host = "localhost";
|
const char *host = "localhost";
|
||||||
const char *user = "root";
|
const char *user = "root";
|
||||||
|
@ -205,7 +217,7 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ANCHOR: create_consumer_1
|
// ANCHOR: create_consumer_1
|
||||||
tmq_t* build_consumer() {
|
tmq_t* build_consumer(const ConsumerConfig* config) {
|
||||||
tmq_conf_res_t code;
|
tmq_conf_res_t code;
|
||||||
tmq_t* tmq = NULL;
|
tmq_t* tmq = NULL;
|
||||||
|
|
||||||
|
@ -213,37 +225,47 @@ tmq_t* build_consumer() {
|
||||||
tmq_conf_t* conf = tmq_conf_new();
|
tmq_conf_t* conf = tmq_conf_new();
|
||||||
|
|
||||||
// set the configuration parameters
|
// set the configuration parameters
|
||||||
code = tmq_conf_set(conf, "enable.auto.commit", "true");
|
code = tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit);
|
||||||
if (TMQ_CONF_OK != code) {
|
if (TMQ_CONF_OK != code) {
|
||||||
tmq_conf_destroy(conf);
|
tmq_conf_destroy(conf);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
code = tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms);
|
||||||
if (TMQ_CONF_OK != code) {
|
if (TMQ_CONF_OK != code) {
|
||||||
tmq_conf_destroy(conf);
|
tmq_conf_destroy(conf);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
code = tmq_conf_set(conf, "group.id", "group1");
|
code = tmq_conf_set(conf, "group.id", config->group_id);
|
||||||
if (TMQ_CONF_OK != code) {
|
if (TMQ_CONF_OK != code) {
|
||||||
tmq_conf_destroy(conf);
|
tmq_conf_destroy(conf);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
code = tmq_conf_set(conf, "client.id", "client1");
|
code = tmq_conf_set(conf, "client.id", config->client_id);
|
||||||
if (TMQ_CONF_OK != code) {
|
if (TMQ_CONF_OK != code) {
|
||||||
tmq_conf_destroy(conf);
|
tmq_conf_destroy(conf);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
code = tmq_conf_set(conf, "td.connect.user", "root");
|
code = tmq_conf_set(conf, "td.connect.ip", config->td_connect_host);
|
||||||
if (TMQ_CONF_OK != code) {
|
if (TMQ_CONF_OK != code) {
|
||||||
tmq_conf_destroy(conf);
|
tmq_conf_destroy(conf);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
code = tmq_conf_set(conf, "td.connect.port", config->td_connect_port);
|
||||||
if (TMQ_CONF_OK != code) {
|
if (TMQ_CONF_OK != code) {
|
||||||
tmq_conf_destroy(conf);
|
tmq_conf_destroy(conf);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
code = tmq_conf_set(conf, "auto.offset.reset", "latest");
|
code = tmq_conf_set(conf, "td.connect.user", config->td_connect_user);
|
||||||
|
if (TMQ_CONF_OK != code) {
|
||||||
|
tmq_conf_destroy(conf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
code = tmq_conf_set(conf, "td.connect.pass", config->td_connect_pass);
|
||||||
|
if (TMQ_CONF_OK != code) {
|
||||||
|
tmq_conf_destroy(conf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
code = tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset);
|
||||||
if (TMQ_CONF_OK != code) {
|
if (TMQ_CONF_OK != code) {
|
||||||
tmq_conf_destroy(conf);
|
tmq_conf_destroy(conf);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -392,16 +414,29 @@ int main(int argc, char* argv[]) {
|
||||||
|
|
||||||
if (pthread_create(&thread_id, NULL, &prepare_data, NULL)) {
|
if (pthread_create(&thread_id, NULL, &prepare_data, NULL)) {
|
||||||
fprintf(stderr, "Failed to create thread.\n");
|
fprintf(stderr, "Failed to create thread.\n");
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ANCHOR: create_consumer_2
|
|
||||||
tmq_t* tmq = build_consumer();
|
|
||||||
if (NULL == tmq) {
|
|
||||||
fprintf(stderr, "Failed to create consumer.\n");
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
fprintf(stdout, "Create consumer successfully.\n");
|
|
||||||
|
ConsumerConfig config = {
|
||||||
|
.enable_auto_commit = "true",
|
||||||
|
.auto_commit_interval_ms = "1000",
|
||||||
|
.group_id = "group1",
|
||||||
|
.client_id = "client1",
|
||||||
|
.td_connect_host = "localhost",
|
||||||
|
.td_connect_port = "6030",
|
||||||
|
.td_connect_user = "root",
|
||||||
|
.td_connect_pass = "taosdata",
|
||||||
|
.auto_offset_reset = "latest"
|
||||||
|
};
|
||||||
|
|
||||||
|
// ANCHOR: create_consumer_2
|
||||||
|
tmq_t* tmq = build_consumer(&config);
|
||||||
|
if (NULL == tmq) {
|
||||||
|
fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n", config.td_connect_host, config.group_id, config.client_id);
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, , clientId: %s.\n", config.td_connect_host, config.group_id, config.client_id);
|
||||||
|
}
|
||||||
|
|
||||||
// ANCHOR_END: create_consumer_2
|
// ANCHOR_END: create_consumer_2
|
||||||
|
|
||||||
|
|
|
@ -18,11 +18,11 @@ async function createConsumer() {
|
||||||
]);
|
]);
|
||||||
try {
|
try {
|
||||||
return await taos.tmqConnect(configMap);
|
return await taos.tmqConnect(configMap);
|
||||||
}catch (err) {
|
} catch (err) {
|
||||||
console.log(err);
|
console.log(err);
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
// ANCHOR_END: create_consumer
|
// ANCHOR_END: create_consumer
|
||||||
|
|
||||||
|
@ -33,7 +33,7 @@ async function prepare() {
|
||||||
conf.setDb('power');
|
conf.setDb('power');
|
||||||
const createDB = `CREATE DATABASE IF NOT EXISTS POWER ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`;
|
const createDB = `CREATE DATABASE IF NOT EXISTS POWER ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`;
|
||||||
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;
|
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;
|
||||||
|
|
||||||
let wsSql = await taos.sqlConnect(conf);
|
let wsSql = await taos.sqlConnect(conf);
|
||||||
await wsSql.exec(createDB);
|
await wsSql.exec(createDB);
|
||||||
await wsSql.exec(createStable);
|
await wsSql.exec(createStable);
|
||||||
|
@ -57,9 +57,9 @@ async function subscribe(consumer) {
|
||||||
for (let [key, value] of res) {
|
for (let [key, value] of res) {
|
||||||
console.log(key, value);
|
console.log(key, value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}catch (err) {
|
} catch (err) {
|
||||||
console.error("Failed to poll data; err.code, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
console.error("Failed to poll data; ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,7 +76,7 @@ async function test() {
|
||||||
let res = new Map();
|
let res = new Map();
|
||||||
while (res.size == 0) {
|
while (res.size == 0) {
|
||||||
res = await consumer.poll(100);
|
res = await consumer.poll(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
let assignment = await consumer.assignment();
|
let assignment = await consumer.assignment();
|
||||||
await consumer.seekToBeginning(assignment);
|
await consumer.seekToBeginning(assignment);
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import taos
|
import taos
|
||||||
|
|
||||||
conn = None
|
conn = None
|
||||||
reqId = 1
|
reqId = 3
|
||||||
host="localhost"
|
host="localhost"
|
||||||
port=6030
|
port=6030
|
||||||
try:
|
try:
|
||||||
|
@ -21,4 +21,4 @@ except Exception as err:
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -2,7 +2,7 @@ import taosrest
|
||||||
|
|
||||||
client = None
|
client = None
|
||||||
url="http://localhost:6041"
|
url="http://localhost:6041"
|
||||||
reqId = 1
|
reqId = 3
|
||||||
try:
|
try:
|
||||||
client = taosrest.RestClient(url=url,
|
client = taosrest.RestClient(url=url,
|
||||||
user="root",
|
user="root",
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import taosws
|
import taosws
|
||||||
|
|
||||||
conn = None
|
conn = None
|
||||||
reqId = 1
|
reqId = 3
|
||||||
host="localhost"
|
host="localhost"
|
||||||
port=6041
|
port=6041
|
||||||
try:
|
try:
|
||||||
|
@ -12,7 +12,7 @@ try:
|
||||||
port=port,
|
port=port,
|
||||||
)
|
)
|
||||||
|
|
||||||
result = conn.query_with_req_id("SELECT ts, current, location FROM power.meters limit 100", req_id=1)
|
result = conn.query_with_req_id("SELECT ts, current, location FROM power.meters limit 100", req_id=3)
|
||||||
# Get data from result as list of tuple
|
# Get data from result as list of tuple
|
||||||
for row in result:
|
for row in result:
|
||||||
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
|
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
|
||||||
|
@ -21,4 +21,4 @@ except Exception as err:
|
||||||
print(f"Failed to execute sql with reqId:{reqId}, db addr:{host}:{port} ; ErrMessage:{err}")
|
print(f"Failed to execute sql with reqId:{reqId}, db addr:{host}:{port} ; ErrMessage:{err}")
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -42,10 +42,10 @@ public class ConsumerLoopFull {
|
||||||
return consumer;
|
return consumer;
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||||
System.out.println("Failed to create native consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to create native consumer, host: " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||||
throw new SQLException("Failed to create consumer", ex);
|
throw new SQLException("Failed to create consumer", ex);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
System.out.println("Failed to create native consumer, host : " + config.getProperty("bootstrap.servers")
|
System.out.println("Failed to create native consumer, host: " + config.getProperty("bootstrap.servers")
|
||||||
+ "; ErrMessage: " + ex.getMessage());
|
+ "; ErrMessage: " + ex.getMessage());
|
||||||
throw new SQLException("Failed to create consumer", ex);
|
throw new SQLException("Failed to create consumer", ex);
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,10 +40,10 @@ public class WsConsumerLoopFull {
|
||||||
return consumer;
|
return consumer;
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||||
System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to create websocket consumer, host: " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||||
throw new SQLException("Failed to create consumer", ex);
|
throw new SQLException("Failed to create consumer", ex);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers")
|
System.out.println("Failed to create websocket consumer, host: " + config.getProperty("bootstrap.servers")
|
||||||
+ "; ErrMessage: " + ex.getMessage());
|
+ "; ErrMessage: " + ex.getMessage());
|
||||||
throw new SQLException("Failed to create consumer", ex);
|
throw new SQLException("Failed to create consumer", ex);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue