Merge pull request #262 from tidyjiang8/dev-jch
[rust connector] support subscribe
This commit is contained in:
commit
fc858b0d5a
|
@ -10,7 +10,11 @@ curl https://sh.rustup.rs -sSf | sh
|
|||
|
||||
## Run with Sample
|
||||
|
||||
Build and run:
|
||||
Build and run basic sample:
|
||||
```
|
||||
cargo run --example demo
|
||||
```
|
||||
Build and run subscribe sample:
|
||||
```
|
||||
cargo run --example subscribe
|
||||
```
|
||||
|
|
|
@ -2,7 +2,8 @@ use std::process;
|
|||
use tdengine::Tdengine;
|
||||
|
||||
fn main() {
|
||||
let tde = Tdengine::new("127.0.0.1", "root", "taosdata", "", 0).unwrap_or_else(|err| {
|
||||
let tde = Tdengine::new("127.0.0.1", "root", "taosdata", "demo", 0)
|
||||
.unwrap_or_else(|err| {
|
||||
eprintln!("Can't create Tdengine: {}", err);
|
||||
process::exit(1)
|
||||
});
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
use std::process;
|
||||
use tdengine::Subscriber;
|
||||
|
||||
fn main() {
|
||||
let subscriber = Subscriber::new("127.0.0.1", "root", "taosdata", "demo", "m1", 0, 1000)
|
||||
.unwrap_or_else(|err| {
|
||||
eprintln!("Can't create Subscriber: {}", err);
|
||||
process::exit(1)
|
||||
});
|
||||
|
||||
loop {
|
||||
let row = subscriber.consume().unwrap_or_else(|err| {
|
||||
eprintln!("consume exit: {}", err);
|
||||
process::exit(1)
|
||||
});
|
||||
|
||||
subscriber.print_row(&row);
|
||||
}
|
||||
}
|
|
@ -1,76 +1,10 @@
|
|||
#![allow(unused)]
|
||||
#![allow(non_camel_case_types)]
|
||||
|
||||
use std::os::raw::c_void;
|
||||
use std::os::raw::c_char;
|
||||
use std::os::raw::c_int;
|
||||
use std::ffi::CString;
|
||||
use std::ffi::CStr;
|
||||
pub mod subscriber;
|
||||
pub use subscriber::*;
|
||||
|
||||
mod bindings;
|
||||
use bindings::*;
|
||||
pub mod tdengine;
|
||||
pub use tdengine::*;
|
||||
|
||||
pub struct Tdengine {
|
||||
conn: *mut c_void,
|
||||
}
|
||||
|
||||
/// - **TODO**: doc
|
||||
impl Tdengine {
|
||||
|
||||
//! - **TODO**: implement default param.
|
||||
//!
|
||||
//! > refer to https://stackoverflow.com/questions/24047686/default-function-arguments-in-rust
|
||||
pub fn new(ip: &str, username: &str, passwd: &str, db: &str, port: i32) -> Result<Tdengine, &'static str> {
|
||||
unsafe {
|
||||
taos_init();
|
||||
let mut conn = taos_connect(str_into_raw(ip),
|
||||
str_into_raw(username),
|
||||
str_into_raw(passwd),
|
||||
str_into_raw(db),
|
||||
port as c_int);
|
||||
if conn.is_null() {
|
||||
Err("connect error")
|
||||
} else {
|
||||
println!("connected to {}:{} user:{}, db:{}", ip, port, username, db);
|
||||
Ok(Tdengine {conn})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// - **TODO**: check error code
|
||||
pub fn query(self: &Tdengine, s: &str) {
|
||||
unsafe {
|
||||
if taos_query(self.conn, str_into_raw(s)) == 0 {
|
||||
println!("query '{}' ok", s);
|
||||
} else {
|
||||
println!("query '{}' error: {}", s, raw_into_str(taos_errstr(self.conn)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Tdengine {
|
||||
fn drop(&mut self) {
|
||||
unsafe {taos_close(self.conn);}
|
||||
}
|
||||
}
|
||||
|
||||
fn str_into_raw(s: &str) -> *mut c_char {
|
||||
if s.is_empty() {
|
||||
0 as *mut c_char
|
||||
} else {
|
||||
CString::new(s).unwrap().into_raw()
|
||||
}
|
||||
}
|
||||
|
||||
fn raw_into_str<'a>(raw: *mut c_char) -> &'static str {
|
||||
unsafe {CStr::from_ptr(raw).to_str().unwrap()}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[test]
|
||||
fn it_works() {
|
||||
assert_eq!(2 + 2, 4);
|
||||
}
|
||||
}
|
||||
pub mod utils;
|
|
@ -0,0 +1,77 @@
|
|||
#![allow(non_camel_case_types)]
|
||||
#![allow(non_snake_case)]
|
||||
|
||||
#[path = "utils.rs"]
|
||||
mod utils;
|
||||
use utils::*;
|
||||
use utils::bindings::*;
|
||||
|
||||
use std::os::raw::{c_void, c_char, c_int, c_long};
|
||||
|
||||
pub struct Subscriber {
|
||||
tsub: *mut c_void,
|
||||
fields: *mut taosField,
|
||||
fcount: c_int,
|
||||
}
|
||||
|
||||
impl Subscriber {
|
||||
pub fn new(host: &str,
|
||||
username: &str,
|
||||
passwd: &str,
|
||||
db: &str,
|
||||
table:&str,
|
||||
time: i64,
|
||||
mseconds: i32
|
||||
) -> Result<Subscriber, &'static str> {
|
||||
unsafe {
|
||||
let mut tsub = taos_subscribe(str_into_raw(host),
|
||||
str_into_raw(username),
|
||||
str_into_raw(passwd),
|
||||
str_into_raw(db),
|
||||
str_into_raw(table),
|
||||
time as c_long,
|
||||
mseconds as c_int);
|
||||
if tsub.is_null() {
|
||||
return Err("subscribe error")
|
||||
}
|
||||
println!("subscribed to {} user:{}, db:{}, tb:{}, time:{}, mseconds:{}",
|
||||
host, username, db, table, time, mseconds);
|
||||
|
||||
let mut fields = taos_fetch_subfields(tsub);
|
||||
if fields.is_null() {
|
||||
taos_unsubscribe(tsub);
|
||||
return Err("fetch subfields error")
|
||||
}
|
||||
|
||||
let fcount = taos_subfields_count(tsub);
|
||||
if fcount == 0 {
|
||||
taos_unsubscribe(tsub);
|
||||
return Err("subfields count is 0")
|
||||
}
|
||||
|
||||
Ok(Subscriber{tsub, fields, fcount})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn consume(self: &Subscriber) -> Result<Row, &'static str> {
|
||||
unsafe {
|
||||
let taosRow = taos_consume(self.tsub);
|
||||
if taosRow.is_null() {
|
||||
return Err("consume error")
|
||||
}
|
||||
let taosRow= std::slice::from_raw_parts(taosRow, self.fcount as usize);
|
||||
let row = raw_into_row(self.fields, self.fcount, &taosRow);
|
||||
Ok(row)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn print_row(self: &Subscriber, row: &Row) {
|
||||
println!("{}", format_row(row));
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Subscriber {
|
||||
fn drop(&mut self) {
|
||||
unsafe {taos_unsubscribe(self.tsub);}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
#[path = "bindings.rs"]
|
||||
mod bindings;
|
||||
use bindings::*;
|
||||
|
||||
#[path = "utils.rs"]
|
||||
mod utils;
|
||||
use utils::*;
|
||||
|
||||
use std::os::raw::c_void;
|
||||
use std::os::raw::c_char;
|
||||
use std::os::raw::c_int;
|
||||
use std::os::raw::c_long;
|
||||
|
||||
pub struct Tdengine {
|
||||
conn: *mut c_void,
|
||||
}
|
||||
|
||||
/// - **TODO**: doc
|
||||
impl Tdengine {
|
||||
|
||||
//! - **TODO**: implement default param.
|
||||
//!
|
||||
//! > refer to https://stackoverflow.com/questions/24047686/default-function-arguments-in-rust
|
||||
pub fn new(ip: &str, username: &str, passwd: &str, db: &str, port: i32) -> Result<Tdengine, &'static str> {
|
||||
unsafe {
|
||||
taos_init();
|
||||
let mut conn = taos_connect(str_into_raw(ip),
|
||||
str_into_raw(username),
|
||||
str_into_raw(passwd),
|
||||
str_into_raw(db),
|
||||
port as c_int);
|
||||
if conn.is_null() {
|
||||
Err("connect error")
|
||||
} else {
|
||||
println!("connected to {}:{} user:{}, db:{}", ip, port, username, db);
|
||||
Ok(Tdengine {conn})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// - **TODO**: check error code
|
||||
pub fn query(self: &Tdengine, s: &str) {
|
||||
unsafe {
|
||||
if taos_query(self.conn, str_into_raw(s)) == 0 {
|
||||
println!("query '{}' ok", s);
|
||||
} else {
|
||||
println!("query '{}' error: {}", s, raw_into_str(taos_errstr(self.conn)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Tdengine {
|
||||
fn drop(&mut self) {
|
||||
unsafe {taos_close(self.conn);}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[test]
|
||||
fn it_works() {
|
||||
assert_eq!(2 + 2, 4);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,127 @@
|
|||
#[path = "bindings.rs"]
|
||||
pub mod bindings;
|
||||
use bindings::*;
|
||||
|
||||
use std::fmt;
|
||||
use std::fmt::Display;
|
||||
use std::os::raw::{c_void, c_char, c_int};
|
||||
use std::ffi::{CString, CStr};
|
||||
|
||||
// #[derive(Debug)]
|
||||
pub enum Field {
|
||||
tinyInt(i8),
|
||||
smallInt(i16),
|
||||
normalInt(i32),
|
||||
bigInt(i64),
|
||||
float(f32),
|
||||
double(f64),
|
||||
binary(String),
|
||||
timeStamp(i64),
|
||||
boolType(bool),
|
||||
}
|
||||
|
||||
|
||||
impl fmt::Display for Field {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match &*self {
|
||||
Field::tinyInt(v) => write!(f, "{}", v),
|
||||
Field::smallInt(v) => write!(f, "{}", v),
|
||||
Field::normalInt(v) => write!(f, "{}", v),
|
||||
Field::bigInt(v) => write!(f, "{}", v),
|
||||
Field::float(v) => write!(f, "{}", v),
|
||||
Field::double(v) => write!(f, "{}", v),
|
||||
Field::binary(v) => write!(f, "{}", v),
|
||||
Field::tinyInt(v) => write!(f, "{}", v),
|
||||
Field::timeStamp(v) => write!(f, "{}", v),
|
||||
Field::boolType(v) => write!(f, "{}", v),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pub type Fields = Vec<Field>;
|
||||
pub type Row = Vec<Field>;
|
||||
|
||||
pub fn format_row(row: &Row) -> String {
|
||||
let mut s = String::new();
|
||||
for field in row {
|
||||
s.push_str(format!("{} ", field).as_str());
|
||||
// println!("{}", field);
|
||||
}
|
||||
s
|
||||
}
|
||||
|
||||
pub fn str_into_raw(s: &str) -> *mut c_char {
|
||||
if s.is_empty() {
|
||||
0 as *mut c_char
|
||||
} else {
|
||||
CString::new(s).unwrap().into_raw()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn raw_into_str<'a>(raw: *mut c_char) -> &'static str {
|
||||
unsafe {CStr::from_ptr(raw).to_str().unwrap()}
|
||||
}
|
||||
|
||||
|
||||
pub fn raw_into_field(raw: *mut TAOS_FIELD, fcount: c_int) -> Vec<taosField> {
|
||||
let mut fields: Vec<taosField> = Vec::new();
|
||||
|
||||
for i in 0..fcount as isize {
|
||||
fields.push(
|
||||
taosField {
|
||||
name: unsafe {(*raw.offset(i as isize))}.name,
|
||||
bytes: unsafe {(*raw.offset(i as isize))}.bytes,
|
||||
type_: unsafe {(*raw.offset(i as isize))}.type_,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/// TODO: error[E0382]: use of moved value: `fields`
|
||||
// for field in &fields {
|
||||
// println!("type: {}, bytes: {}", field.type_, field.bytes);
|
||||
// }
|
||||
|
||||
fields
|
||||
}
|
||||
|
||||
pub fn raw_into_row(fields: *mut TAOS_FIELD, fcount: c_int, raw_row: &[*mut c_void]) -> Row {
|
||||
let mut row: Row= Vec::new();
|
||||
let fields = raw_into_field(fields, fcount);
|
||||
|
||||
for (i, field) in fields.iter().enumerate() {
|
||||
// println!("index: {}, type: {}, bytes: {}", i, field.type_, field.bytes);
|
||||
unsafe {
|
||||
match field.type_ as u32 {
|
||||
TSDB_DATA_TYPE_TINYINT => {
|
||||
row.push(Field::tinyInt(*(raw_row[i] as *mut i8)));
|
||||
}
|
||||
TSDB_DATA_TYPE_SMALLINT => {
|
||||
row.push(Field::smallInt(*(raw_row[i] as *mut i16)));
|
||||
}
|
||||
TSDB_DATA_TYPE_INT => {
|
||||
row.push(Field::normalInt(*(raw_row[i] as *mut i32)));
|
||||
}
|
||||
TSDB_DATA_TYPE_BIGINT => {
|
||||
row.push(Field::bigInt(*(raw_row[i] as *mut i64)));
|
||||
}
|
||||
TSDB_DATA_TYPE_FLOAT => {
|
||||
row.push(Field::float(*(raw_row[i] as *mut f32)));
|
||||
}
|
||||
TSDB_DATA_TYPE_DOUBLE => {
|
||||
row.push(Field::double(*(raw_row[i] as *mut f64)));
|
||||
}
|
||||
TSDB_DATA_TYPE_BINARY | TSDB_DATA_TYPE_NCHAR => {
|
||||
// row.push(Field::binary(*(raw_row[i] as *mut f64)));
|
||||
}
|
||||
TSDB_DATA_TYPE_TIMESTAMP => {
|
||||
row.push(Field::timeStamp(*(raw_row[i] as *mut i64)));
|
||||
}
|
||||
TSDB_DATA_TYPE_BOOL => {
|
||||
// row.push(Field::boolType(*(raw_row[i] as *mut i8) as bool));
|
||||
}
|
||||
_ => println!(""),
|
||||
}
|
||||
}
|
||||
}
|
||||
row
|
||||
}
|
Loading…
Reference in New Issue