Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/stream_compression

This commit is contained in:
Hongze Cheng 2022-09-21 15:01:23 +08:00
commit 1d76bd6d03
3 changed files with 87 additions and 88 deletions

View File

@ -1,11 +1,11 @@
#include <stdio.h>
#include <math.h> #include <math.h>
#include <stdarg.h> #include <stdarg.h>
#include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "../../../../include/client/taos.h"
#include "lauxlib.h"
#include "lua.h" #include "lua.h"
#include "lauxlib.h"
#include "lualib.h" #include "lualib.h"
#include <taos.h>
struct cb_param{ struct cb_param{
lua_State* state; lua_State* state;
@ -35,7 +35,7 @@ static int l_connect(lua_State *L){
} }
lua_getfield(L, 1, "port"); lua_getfield(L, 1, "port");
if (lua_isnumber(L,-1)){ if (lua_isnumber(L, -1)){
port = lua_tonumber(L, -1); port = lua_tonumber(L, -1);
//printf("port = %d\n", port); //printf("port = %d\n", port);
} }
@ -102,7 +102,7 @@ static int l_query(lua_State *L){
printf("failed, reason:%s\n", taos_errstr(result)); printf("failed, reason:%s\n", taos_errstr(result));
lua_pushinteger(L, -1); lua_pushinteger(L, -1);
lua_setfield(L, table_index, "code"); lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos)); lua_pushstring(L, taos_errstr(result));
lua_setfield(L, table_index, "error"); lua_setfield(L, table_index, "error");
return 1; return 1;
@ -113,7 +113,6 @@ static int l_query(lua_State *L){
int rows = 0; int rows = 0;
int num_fields = taos_field_count(result); int num_fields = taos_field_count(result);
const TAOS_FIELD *fields = taos_fetch_fields(result); const TAOS_FIELD *fields = taos_fetch_fields(result);
//char temp[256];
const int affectRows = taos_affected_rows(result); const int affectRows = taos_affected_rows(result);
// printf(" affect rows:%d\r\n", affectRows); // printf(" affect rows:%d\r\n", affectRows);
@ -122,7 +121,7 @@ static int l_query(lua_State *L){
lua_pushinteger(L, affectRows); lua_pushinteger(L, affectRows);
lua_setfield(L, table_index, "affected"); lua_setfield(L, table_index, "affected");
lua_newtable(L); lua_newtable(L);
while ((row = taos_fetch_row(result))) { while ((row = taos_fetch_row(result))) {
//printf("row index:%d\n",rows); //printf("row index:%d\n",rows);
rows++; rows++;
@ -136,17 +135,21 @@ static int l_query(lua_State *L){
} }
lua_pushstring(L,fields[i].name); lua_pushstring(L,fields[i].name);
int32_t* length = taos_fetch_lengths(result);
switch (fields[i].type) { switch (fields[i].type) {
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
lua_pushinteger(L,*((char *)row[i])); lua_pushinteger(L,*((char *)row[i]));
break; break;
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
lua_pushinteger(L,*((short *)row[i])); lua_pushinteger(L,*((short *)row[i]));
break; break;
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
lua_pushinteger(L,*((int *)row[i])); lua_pushinteger(L,*((int *)row[i]));
break; break;
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
lua_pushinteger(L,*((int64_t *)row[i])); lua_pushinteger(L,*((int64_t *)row[i]));
break; break;
@ -156,9 +159,11 @@ static int l_query(lua_State *L){
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
lua_pushnumber(L,*((double *)row[i])); lua_pushnumber(L,*((double *)row[i]));
break; break;
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
lua_pushstring(L,(char *)row[i]); //printf("type:%d, max len:%d, current len:%d\n",fields[i].type, fields[i].bytes, length[i]);
lua_pushlstring(L,(char *)row[i], length[i]);
break; break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
lua_pushinteger(L,*((int64_t *)row[i])); lua_pushinteger(L,*((int64_t *)row[i]));
@ -166,6 +171,7 @@ static int l_query(lua_State *L){
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
lua_pushinteger(L,*((char *)row[i])); lua_pushinteger(L,*((char *)row[i]));
break; break;
case TSDB_DATA_TYPE_NULL:
default: default:
lua_pushnil(L); lua_pushnil(L);
break; break;
@ -297,51 +303,6 @@ void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){
// printf("-----------------------------------------------------------------------------------\n\r"); // printf("-----------------------------------------------------------------------------------\n\r");
} }
static int l_open_stream(lua_State *L){
int r = luaL_ref(L, LUA_REGISTRYINDEX);
TAOS * taos = (TAOS*)lua_topointer(L,1);
const char * sqlstr = lua_tostring(L,2);
int stime = luaL_checknumber(L,3);
lua_newtable(L);
int table_index = lua_gettop(L);
struct cb_param *p = malloc(sizeof(struct cb_param));
p->state = L;
p->callback=r;
// printf("r:%d, L:%d\n",r,L);
void * s = taos_open_stream(taos,sqlstr,stream_cb,stime,p,NULL);
if (s == NULL) {
printf("failed to open stream, reason:%s\n", taos_errstr(taos));
free(p);
lua_pushnumber(L, -1);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
lua_pushlightuserdata(L,NULL);
lua_setfield(L, table_index, "stream");
}else{
// printf("success to open stream\n");
lua_pushnumber(L, 0);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
p->stream = s;
lua_pushlightuserdata(L,p);
lua_setfield(L, table_index, "stream");//stream has different content in lua and c.
}
return 1;
}
static int l_close_stream(lua_State *L){
//TODO:get stream and free cb_param
struct cb_param *p = lua_touserdata(L,1);
taos_close_stream(p->stream);
free(p);
return 0;
}
static int l_close(lua_State *L){ static int l_close(lua_State *L){
TAOS *taos= (TAOS*)lua_topointer(L,1); TAOS *taos= (TAOS*)lua_topointer(L,1);
lua_newtable(L); lua_newtable(L);
@ -367,8 +328,6 @@ static const struct luaL_Reg lib[] = {
{"query", l_query}, {"query", l_query},
{"query_a",l_async_query}, {"query_a",l_async_query},
{"close", l_close}, {"close", l_close},
{"open_stream", l_open_stream},
{"close_stream", l_close_stream},
{NULL, NULL} {NULL, NULL}
}; };

View File

@ -5,7 +5,7 @@
#include <lua.h> #include <lua.h>
#include <lauxlib.h> #include <lauxlib.h>
#include <lualib.h> #include <lualib.h>
#include "taos.h" #include <taos.h>
struct cb_param{ struct cb_param{
lua_State* state; lua_State* state;
@ -60,6 +60,8 @@ static int l_connect(lua_State *L){
lua_settop(L,0); lua_settop(L,0);
taos_init();
lua_newtable(L); lua_newtable(L);
int table_index = lua_gettop(L); int table_index = lua_gettop(L);

View File

@ -9,6 +9,50 @@ local config = {
max_packet_size = 1024 * 1024 max_packet_size = 1024 * 1024
} }
function dump(obj)
local getIndent, quoteStr, wrapKey, wrapVal, dumpObj
getIndent = function(level)
return string.rep("\t", level)
end
quoteStr = function(str)
return '"' .. string.gsub(str, '"', '\\"') .. '"'
end
wrapKey = function(val)
if type(val) == "number" then
return "[" .. val .. "]"
elseif type(val) == "string" then
return "[" .. quoteStr(val) .. "]"
else
return "[" .. tostring(val) .. "]"
end
end
wrapVal = function(val, level)
if type(val) == "table" then
return dumpObj(val, level)
elseif type(val) == "number" then
return val
elseif type(val) == "string" then
return quoteStr(val)
else
return tostring(val)
end
end
dumpObj = function(obj, level)
if type(obj) ~= "table" then
return wrapVal(obj)
end
level = level + 1
local tokens = {}
tokens[#tokens + 1] = "{"
for k, v in pairs(obj) do
tokens[#tokens + 1] = getIndent(level) .. wrapKey(k) .. " = " .. wrapVal(v, level) .. ","
end
tokens[#tokens + 1] = getIndent(level - 1) .. "}"
return table.concat(tokens, "\n")
end
return dumpObj(obj, 0)
end
local conn local conn
local res = driver.connect(config) local res = driver.connect(config)
if res.code ~=0 then if res.code ~=0 then
@ -37,7 +81,7 @@ else
print("select db--- pass.") print("select db--- pass.")
end end
res = driver.query(conn,"create table m1 (ts timestamp, speed int,owner binary(20))") res = driver.query(conn,"create table m1 (ts timestamp, speed int, owner binary(20), mark nchar(30))")
if res.code ~=0 then if res.code ~=0 then
print("create table---failed: "..res.error) print("create table---failed: "..res.error)
return return
@ -45,7 +89,7 @@ else
print("create table--- pass.") print("create table--- pass.")
end end
res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001',0,'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')") res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001', 0, 'robotspace', '世界人民大团结万岁'), ('2019-09-01 00:00:00.002', 1, 'Hilink', '⾾⾿⿀⿁⿂⿃⿄⿅⿆⿇⿈⿉⿊⿋⿌⿍⿎⿏⿐⿑⿒⿓⿔⿕'),('2019-09-01 00:00:00.003', 2, 'Harmony', '₠₡₢₣₤₥₦₧₨₩₪₫€₭₮₯₰₱₲₳₴₵')")
if res.code ~=0 then if res.code ~=0 then
print("insert records failed: "..res.error) print("insert records failed: "..res.error)
return return
@ -64,21 +108,25 @@ if res.code ~=0 then
return return
else else
if (#(res.item) == 3) then if (#(res.item) == 3) then
print("select--- pass") print("select--- pass")
print(res.item[1].mark)
print(res.item[2].mark)
print(res.item[3].mark)
else else
print("select--- failed: expect 3 affected records, actually received "..#(res.item)) print("select--- failed: expect 3 affected records, actually received "..#(res.item))
end end
end end
res = driver.query(conn,"CREATE TABLE thermometer (ts timestamp, degree double) TAGS(location binary(20), type int)") res = driver.query(conn,"create table thermometer (ts timestamp, degree double) tags(location binary(20), type int)")
if res.code ~=0 then if res.code ~=0 then
print(res.error) print(res.error)
return return
else else
print("create super table--- pass") print("create super table--- pass")
end end
res = driver.query(conn,"CREATE TABLE therm1 USING thermometer TAGS ('beijing', 1)") res = driver.query(conn,"create table therm1 using thermometer tags ('beijing', 1)")
if res.code ~=0 then if res.code ~=0 then
print(res.error) print(res.error)
return return
@ -86,7 +134,7 @@ else
print("create table--- pass") print("create table--- pass")
end end
res = driver.query(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.001', 20),('2019-09-01 00:00:00.002', 21)") res = driver.query(conn,"insert into therm1 values ('2019-09-01 00:00:00.001', 20),('2019-09-01 00:00:00.002', 21)")
if res.code ~=0 then if res.code ~=0 then
print(res.error) print(res.error)
@ -99,14 +147,14 @@ else
end end
end end
res = driver.query(conn,"SELECT COUNT(*) count, AVG(degree) AS av, MAX(degree), MIN(degree) FROM thermometer WHERE location='beijing' or location='tianjin' GROUP BY location, type") res = driver.query(conn,"select count(*) as cnt, avg(degree) as av, max(degree), min(degree) from thermometer where location='beijing' or location='tianjin' group by location, type")
if res.code ~=0 then if res.code ~=0 then
print("select from super table--- failed:"..res.error) print("select from super table--- failed:"..res.error)
return return
else else
print("select from super table--- pass") print("select from super table--- pass")
for i = 1, #(res.item) do for i = 1, #(res.item) do
print("res:"..res.item[i].count) print("res:"..res.item[i].cnt)
end end
end end
@ -125,33 +173,16 @@ function async_query_callback(res)
end end
end end
driver.query_a(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback) driver.query_a(conn,"insert into therm1 values ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback)
res = driver.query(conn, "create stream stream_avg_degree into avg_degree as select avg(degree) from thermometer interval(5s) sliding(1s)")
function stream_callback(t) print("From now on we start continous insertion in an definite (infinite if you want) loop.")
print("------------------------")
print("continuous query result:")
for key, value in pairs(t) do
print("key:"..key..", value:"..value)
end
end
local stream
res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0, stream_callback)
if res.code ~=0 then
print("open stream--- failed:"..res.error)
return
else
print("open stream--- pass")
stream = res.stream
end
print("From now on we start continous insert in an definite (infinite if you want) loop.")
local loop_index = 0 local loop_index = 0
while loop_index < 30 do while loop_index < 30 do
local t = os.time()*1000 local t = os.time()*1000
local v = loop_index local v = loop_index
res = driver.query(conn,string.format("INSERT INTO therm1 VALUES (%d, %d)",t,v)) res = driver.query(conn,string.format("insert into therm1 values (%d, %d)",t,v))
if res.code ~=0 then if res.code ~=0 then
print("continous insertion--- failed:" .. res.error) print("continous insertion--- failed:" .. res.error)
@ -159,10 +190,17 @@ while loop_index < 30 do
else else
--print("insert successfully, affected:"..res.affected) --print("insert successfully, affected:"..res.affected)
end end
local res1 = driver.query(conn, string.format("select last(*) from avg_degree"))
if res1.code ~=0 then
print("select failed: "..res1.error)
return
else
-- print(dump(res1))
if(#res1.item > 0) then print("avg_degree: " .. res1.item[1]["last(avg(degree))"]) end
end
os.execute("sleep " .. 1) os.execute("sleep " .. 1)
loop_index = loop_index + 1 loop_index = loop_index + 1
end end
driver.close_stream(stream)
driver.close(conn) driver.close(conn)