Merge pull request #707 from robotspace/develop
Add support for continuous query in Lua connector.
This commit is contained in:
commit
4decb7b88d
|
@ -7,8 +7,15 @@
|
|||
#include <lualib.h>
|
||||
#include <taos.h>
|
||||
|
||||
static int l_connect(lua_State *L)
|
||||
{
|
||||
struct cb_param{
|
||||
lua_State* state;
|
||||
int callback;
|
||||
void * stream;
|
||||
};
|
||||
|
||||
|
||||
|
||||
static int l_connect(lua_State *L){
|
||||
TAOS * taos;
|
||||
char *host = lua_tostring(L, 1);
|
||||
char *user = lua_tostring(L, 2);
|
||||
|
@ -29,6 +36,7 @@ static int l_connect(lua_State *L)
|
|||
lua_pushstring(L, taos_errstr(taos));
|
||||
lua_setfield(L, table_index, "error");
|
||||
lua_pushlightuserdata(L,NULL);
|
||||
lua_setfield(L, table_index, "conn");
|
||||
}else{
|
||||
printf("success to connect server\n");
|
||||
lua_pushnumber(L, 0);
|
||||
|
@ -49,7 +57,7 @@ static int l_query(lua_State *L){
|
|||
lua_newtable(L);
|
||||
int table_index = lua_gettop(L);
|
||||
|
||||
printf("receive command:%s\r\n",s);
|
||||
// printf("receive command:%s\r\n",s);
|
||||
if(taos_query(taos, s)!=0){
|
||||
printf("failed, reason:%s\n", taos_errstr(taos));
|
||||
lua_pushnumber(L, -1);
|
||||
|
@ -78,8 +86,12 @@ static int l_query(lua_State *L){
|
|||
TAOS_FIELD *fields = taos_fetch_fields(result);
|
||||
char temp[256];
|
||||
|
||||
int affectRows = taos_affected_rows(taos);
|
||||
// printf(" affect rows:%d\r\n", affectRows);
|
||||
lua_pushnumber(L, 0);
|
||||
lua_setfield(L, table_index, "code");
|
||||
lua_pushinteger(L, affectRows);
|
||||
lua_setfield(L, table_index, "affected");
|
||||
lua_newtable(L);
|
||||
|
||||
while ((row = taos_fetch_row(result))) {
|
||||
|
@ -95,7 +107,7 @@ static int l_query(lua_State *L){
|
|||
}
|
||||
|
||||
lua_pushstring(L,fields[i].name);
|
||||
//printf("field name:%s,type:%d\n",fields[i].name,fields[i].type);
|
||||
|
||||
switch (fields[i].type) {
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
lua_pushinteger(L,*((char *)row[i]));
|
||||
|
@ -142,6 +154,115 @@ static int l_query(lua_State *L){
|
|||
return 1;
|
||||
}
|
||||
|
||||
void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){
|
||||
|
||||
struct cb_param* p = (struct cb_param*) param;
|
||||
TAOS_FIELD *fields = taos_fetch_fields(result);
|
||||
int numFields = taos_num_fields(result);
|
||||
|
||||
printf("\n\r-----------------------------------------------------------------------------------\n");
|
||||
|
||||
// printf("r:%d, L:%d\n",p->callback, p->state);
|
||||
|
||||
lua_State *L = p->state;
|
||||
lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback);
|
||||
|
||||
lua_newtable(L);
|
||||
|
||||
for (int i = 0; i < numFields; ++i) {
|
||||
if (row[i] == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
lua_pushstring(L,fields[i].name);
|
||||
|
||||
switch (fields[i].type) {
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
lua_pushinteger(L,*((char *)row[i]));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
lua_pushinteger(L,*((short *)row[i]));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
lua_pushinteger(L,*((int *)row[i]));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
lua_pushinteger(L,*((int64_t *)row[i]));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
lua_pushnumber(L,*((float *)row[i]));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
lua_pushnumber(L,*((double *)row[i]));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
lua_pushstring(L,(char *)row[i]);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
lua_pushinteger(L,*((int64_t *)row[i]));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
lua_pushinteger(L,*((char *)row[i]));
|
||||
break;
|
||||
default:
|
||||
lua_pushnil(L);
|
||||
break;
|
||||
}
|
||||
|
||||
lua_settable(L, -3);
|
||||
}
|
||||
|
||||
lua_call(L, 1, 0);
|
||||
|
||||
printf("-----------------------------------------------------------------------------------\n\r");
|
||||
}
|
||||
|
||||
static int l_open_stream(lua_State *L){
|
||||
int r = luaL_ref(L, LUA_REGISTRYINDEX);
|
||||
TAOS * taos = lua_topointer(L,1);
|
||||
char * sqlstr = lua_tostring(L,2);
|
||||
int stime = luaL_checknumber(L,3);
|
||||
|
||||
lua_newtable(L);
|
||||
int table_index = lua_gettop(L);
|
||||
|
||||
struct cb_param *p = malloc(sizeof(struct cb_param));
|
||||
p->state = L;
|
||||
p->callback=r;
|
||||
// printf("r:%d, L:%d\n",r,L);
|
||||
void * s = taos_open_stream(taos,sqlstr,stream_cb,stime,p,NULL);
|
||||
if (s == NULL) {
|
||||
printf("failed to open stream, reason:%s\n", taos_errstr(taos));
|
||||
free(p);
|
||||
lua_pushnumber(L, -1);
|
||||
lua_setfield(L, table_index, "code");
|
||||
lua_pushstring(L, taos_errstr(taos));
|
||||
lua_setfield(L, table_index, "error");
|
||||
lua_pushlightuserdata(L,NULL);
|
||||
lua_setfield(L, table_index, "stream");
|
||||
}else{
|
||||
// printf("success to open stream\n");
|
||||
lua_pushnumber(L, 0);
|
||||
lua_setfield(L, table_index, "code");
|
||||
lua_pushstring(L, taos_errstr(taos));
|
||||
lua_setfield(L, table_index, "error");
|
||||
p->stream = s;
|
||||
lua_pushlightuserdata(L,p);
|
||||
lua_setfield(L, table_index, "stream");//stream has different content in lua and c.
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int l_close_stream(lua_State *L){
|
||||
//TODO:get stream and free cb_param
|
||||
struct cb_param *p = lua_touserdata(L,1);
|
||||
taos_close_stream(p->stream);
|
||||
free(p);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int l_close(lua_State *L){
|
||||
TAOS * taos= lua_topointer(L,1);
|
||||
lua_newtable(L);
|
||||
|
@ -166,6 +287,8 @@ static const struct luaL_Reg lib[] = {
|
|||
{"connect", l_connect},
|
||||
{"query", l_query},
|
||||
{"close", l_close},
|
||||
{"open_stream", l_open_stream},
|
||||
{"close_stream", l_close_stream},
|
||||
{NULL, NULL}
|
||||
};
|
||||
|
||||
|
|
|
@ -35,10 +35,12 @@ if res.code ~=0 then
|
|||
return
|
||||
end
|
||||
|
||||
res = driver.query(conn,"insert into m1 values (1592222222222,0,'robotspace'), (1592222222223,1,'Hilink'),(1592222222224,2,'Harmony')")
|
||||
res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001',0,'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')")
|
||||
if res.code ~=0 then
|
||||
print(res.error)
|
||||
return
|
||||
else
|
||||
print("insert successfully, affected:"..res.affected)
|
||||
end
|
||||
|
||||
res = driver.query(conn,"select * from m1")
|
||||
|
@ -55,4 +57,69 @@ else
|
|||
end
|
||||
end
|
||||
|
||||
res = driver.query(conn,"CREATE TABLE thermometer (ts timestamp, degree double) TAGS(location binary(20), type int)")
|
||||
if res.code ~=0 then
|
||||
print(res.error)
|
||||
return
|
||||
end
|
||||
res = driver.query(conn,"CREATE TABLE therm1 USING thermometer TAGS ('beijing', 1)")
|
||||
if res.code ~=0 then
|
||||
print(res.error)
|
||||
return
|
||||
end
|
||||
res = driver.query(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.001', 20),('2019-09-01 00:00:00.002', 21)")
|
||||
|
||||
if res.code ~=0 then
|
||||
print(res.error)
|
||||
return
|
||||
else
|
||||
print("insert successfully, affected:"..res.affected)
|
||||
end
|
||||
|
||||
res = driver.query(conn,"SELECT COUNT(*) count, AVG(degree) AS av, MAX(degree), MIN(degree) FROM thermometer WHERE location='beijing' or location='tianjin' GROUP BY location, type")
|
||||
if res.code ~=0 then
|
||||
print("select error:"..res.error)
|
||||
return
|
||||
else
|
||||
print("in lua, result:")
|
||||
for i = 1, #(res.item) do
|
||||
print("res:"..res.item[i].count)
|
||||
end
|
||||
end
|
||||
|
||||
function callback(t)
|
||||
print("continuous query result:")
|
||||
for key, value in pairs(t) do
|
||||
print("key:"..key..", value:"..value)
|
||||
end
|
||||
end
|
||||
|
||||
local stream
|
||||
res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0,callback)
|
||||
if res.code ~=0 then
|
||||
print("open stream error:"..res.error)
|
||||
return
|
||||
else
|
||||
print("openstream ok")
|
||||
stream = res.stream
|
||||
end
|
||||
|
||||
--From now on we begin continous query in an definite (infinite if you want) loop.
|
||||
local loop_index = 0
|
||||
while loop_index < 20 do
|
||||
local t = os.time()*1000
|
||||
local v = loop_index
|
||||
res = driver.query(conn,string.format("INSERT INTO therm1 VALUES (%d, %d)",t,v))
|
||||
|
||||
if res.code ~=0 then
|
||||
print(res.error)
|
||||
return
|
||||
else
|
||||
print("insert successfully, affected:"..res.affected)
|
||||
end
|
||||
os.execute("sleep " .. 1)
|
||||
loop_index = loop_index + 1
|
||||
end
|
||||
|
||||
driver.close_stream(stream)
|
||||
driver.close(conn)
|
||||
|
|
Loading…
Reference in New Issue