Add support for async insert and connection pool. (#6244)
* Add support for async query. Only insert result is parsed. * Add support for connection pool. * Add one case for lua connector in smoke test case list. * Build dymanic library for lua connector before smoke test.
This commit is contained in:
parent
09c4c28169
commit
68aeccbe7f
|
@ -6,9 +6,9 @@ events {
|
|||
}
|
||||
|
||||
http {
|
||||
lua_package_path '$prefix/lua/?.lua;$prefix/rest/?.lua;/blah/?.lua;;';
|
||||
lua_package_path '$prefix/lua/?.lua;$prefix/rest/?.lua;$prefix/rest/?/init.lua;;';
|
||||
lua_package_cpath "$prefix/so/?.so;;";
|
||||
lua_code_cache off;
|
||||
lua_code_cache on;
|
||||
server {
|
||||
listen 7000;
|
||||
server_name restapi;
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
local config = {
|
||||
host = "127.0.0.1",
|
||||
port = 6030,
|
||||
database = "",
|
||||
user = "root",
|
||||
password = "taosdata",
|
||||
max_packet_size = 1024 * 1024 ,
|
||||
connection_pool_size = 64
|
||||
}
|
||||
return config
|
|
@ -0,0 +1,72 @@
|
|||
local _M = {}
|
||||
local driver = require "luaconnector51"
|
||||
local water_mark = 0
|
||||
local occupied = 0
|
||||
local connection_pool = {}
|
||||
|
||||
function _M.new(o,config)
|
||||
o = o or {}
|
||||
o.connection_pool = connection_pool
|
||||
o.water_mark = water_mark
|
||||
o.occupied = occupied
|
||||
if #connection_pool == 0 then
|
||||
|
||||
for i = 1, config.connection_pool_size do
|
||||
local res = driver.connect(config)
|
||||
if res.code ~= 0 then
|
||||
ngx.log(ngx.ERR, "connect--- failed:"..res.error)
|
||||
return nil
|
||||
else
|
||||
local object = {obj = res.conn, state = 0}
|
||||
table.insert(o.connection_pool,i, object)
|
||||
ngx.log(ngx.INFO, "add connection, now pool size:"..#(o.connection_pool))
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
return setmetatable(o, { __index = _M })
|
||||
end
|
||||
|
||||
function _M:get_connection()
|
||||
|
||||
local connection_obj
|
||||
|
||||
for i = 1, #connection_pool do
|
||||
connection_obj = connection_pool[i]
|
||||
if connection_obj.state == 0 then
|
||||
connection_obj.state = 1
|
||||
occupied = occupied +1
|
||||
if occupied > water_mark then
|
||||
water_mark = occupied
|
||||
end
|
||||
return connection_obj["obj"]
|
||||
end
|
||||
end
|
||||
|
||||
ngx.log(ngx.ERR,"ALERT! NO FREE CONNECTION.")
|
||||
|
||||
return nil
|
||||
end
|
||||
|
||||
function _M:get_water_mark()
|
||||
|
||||
return water_mark
|
||||
end
|
||||
|
||||
function _M:release_connection(conn)
|
||||
|
||||
local connection_obj
|
||||
|
||||
for i = 1, #connection_pool do
|
||||
connection_obj = connection_pool[i]
|
||||
|
||||
if connection_obj["obj"] == conn then
|
||||
connection_obj["state"] = 0
|
||||
occupied = occupied -1
|
||||
return
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
return _M
|
|
@ -1,26 +1,11 @@
|
|||
local driver = require "luaconnector51"
|
||||
local cjson = require "cjson"
|
||||
local Pool = require "tdpool"
|
||||
local config = require "config"
|
||||
ngx.say("start time:"..os.time())
|
||||
|
||||
|
||||
local config = {
|
||||
host = "127.0.0.1",
|
||||
port = 6030,
|
||||
database = "",
|
||||
user = "root",
|
||||
password = "taosdata",
|
||||
max_packet_size = 1024 * 1024
|
||||
}
|
||||
|
||||
local conn
|
||||
local res = driver.connect(config)
|
||||
if res.code ~=0 then
|
||||
ngx.say("connect--- failed: "..res.error)
|
||||
return
|
||||
else
|
||||
conn = res.conn
|
||||
ngx.say("connect--- pass.")
|
||||
end
|
||||
local pool = Pool.new(Pool,config)
|
||||
local conn = pool:get_connection()
|
||||
|
||||
local res = driver.query(conn,"drop database if exists nginx")
|
||||
if res.code ~=0 then
|
||||
|
@ -51,7 +36,7 @@ else
|
|||
ngx.say("create table--- pass.")
|
||||
end
|
||||
|
||||
res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001',0,'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')")
|
||||
res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001', 0, 'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')")
|
||||
if res.code ~=0 then
|
||||
ngx.say("insert records failed: "..res.error)
|
||||
return
|
||||
|
@ -77,7 +62,29 @@ else
|
|||
end
|
||||
|
||||
end
|
||||
driver.close(conn)
|
||||
ngx.say("end time:"..os.time())
|
||||
--ngx.log(ngx.ERR,"in test file.")
|
||||
|
||||
local flag = false
|
||||
function query_callback(res)
|
||||
if res.code ~=0 then
|
||||
ngx.say("async_query_callback--- failed:"..res.error)
|
||||
else
|
||||
if(res.affected == 3) then
|
||||
ngx.say("async_query_callback, insert records--- pass")
|
||||
else
|
||||
ngx.say("async_query_callback, insert records---failed: expect 3 affected records, actually affected "..res.affected)
|
||||
end
|
||||
end
|
||||
flag = true
|
||||
end
|
||||
|
||||
driver.query_a(conn,"insert into m1 values ('2019-09-01 00:00:00.001', 3, 'robotspace'),('2019-09-01 00:00:00.006', 4, 'Hilink'),('2019-09-01 00:00:00.007', 6, 'Harmony')", query_callback)
|
||||
|
||||
while not flag do
|
||||
-- ngx.say("i am here once...")
|
||||
ngx.sleep(0.001) -- time unit is second
|
||||
end
|
||||
|
||||
ngx.say("pool water_mark:"..pool:get_water_mark())
|
||||
|
||||
pool:release_connection(conn)
|
||||
ngx.say("end time:"..os.time())
|
||||
|
|
Binary file not shown.
|
@ -13,6 +13,11 @@ struct cb_param{
|
|||
void * stream;
|
||||
};
|
||||
|
||||
struct async_query_callback_param{
|
||||
lua_State* state;
|
||||
int callback;
|
||||
};
|
||||
|
||||
static int l_connect(lua_State *L){
|
||||
TAOS * taos=NULL;
|
||||
const char* host;
|
||||
|
@ -23,7 +28,7 @@ static int l_connect(lua_State *L){
|
|||
|
||||
luaL_checktype(L, 1, LUA_TTABLE);
|
||||
|
||||
lua_getfield(L,-1,"host");
|
||||
lua_getfield(L, 1,"host");
|
||||
if (lua_isstring(L,-1)){
|
||||
host = lua_tostring(L, -1);
|
||||
// printf("host = %s\n", host);
|
||||
|
@ -178,6 +183,58 @@ static int l_query(lua_State *L){
|
|||
return 1;
|
||||
}
|
||||
|
||||
void async_query_callback(void *param, TAOS_RES *result, int code){
|
||||
struct async_query_callback_param* p = (struct async_query_callback_param*) param;
|
||||
|
||||
//printf("\nin c,numfields:%d\n", numFields);
|
||||
//printf("\nin c, code:%d\n", code);
|
||||
|
||||
lua_State *L = p->state;
|
||||
lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback);
|
||||
lua_newtable(L);
|
||||
int table_index = lua_gettop(L);
|
||||
if( code < 0){
|
||||
printf("failed, reason:%s\n", taos_errstr(result));
|
||||
lua_pushinteger(L, -1);
|
||||
lua_setfield(L, table_index, "code");
|
||||
lua_pushstring(L,"something is wrong");// taos_errstr(taos));
|
||||
lua_setfield(L, table_index, "error");
|
||||
}else{
|
||||
//printf("success to async query.\n");
|
||||
const int affectRows = taos_affected_rows(result);
|
||||
//printf(" affect rows:%d\r\n", affectRows);
|
||||
lua_pushinteger(L, 0);
|
||||
lua_setfield(L, table_index, "code");
|
||||
lua_pushinteger(L, affectRows);
|
||||
lua_setfield(L, table_index, "affected");
|
||||
}
|
||||
|
||||
lua_call(L, 1, 0);
|
||||
}
|
||||
|
||||
static int l_async_query(lua_State *L){
|
||||
int r = luaL_ref(L, LUA_REGISTRYINDEX);
|
||||
TAOS * taos = (TAOS*)lua_topointer(L,1);
|
||||
const char * sqlstr = lua_tostring(L,2);
|
||||
// int stime = luaL_checknumber(L,3);
|
||||
|
||||
lua_newtable(L);
|
||||
int table_index = lua_gettop(L);
|
||||
|
||||
struct async_query_callback_param *p = malloc(sizeof(struct async_query_callback_param));
|
||||
p->state = L;
|
||||
p->callback=r;
|
||||
// printf("r:%d, L:%d\n",r,L);
|
||||
taos_query_a(taos,sqlstr,async_query_callback,p);
|
||||
|
||||
lua_pushnumber(L, 0);
|
||||
lua_setfield(L, table_index, "code");
|
||||
lua_pushstring(L, "ok");
|
||||
lua_setfield(L, table_index, "error");
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){
|
||||
struct cb_param* p = (struct cb_param*) param;
|
||||
TAOS_FIELD *fields = taos_fetch_fields(result);
|
||||
|
@ -308,6 +365,7 @@ static int l_close(lua_State *L){
|
|||
static const struct luaL_Reg lib[] = {
|
||||
{"connect", l_connect},
|
||||
{"query", l_query},
|
||||
{"query_a",l_async_query},
|
||||
{"close", l_close},
|
||||
{"open_stream", l_open_stream},
|
||||
{"close_stream", l_close_stream},
|
||||
|
|
|
@ -13,6 +13,11 @@ struct cb_param{
|
|||
void * stream;
|
||||
};
|
||||
|
||||
struct async_query_callback_param{
|
||||
lua_State* state;
|
||||
int callback;
|
||||
};
|
||||
|
||||
static int l_connect(lua_State *L){
|
||||
TAOS * taos=NULL;
|
||||
const char* host;
|
||||
|
@ -56,6 +61,7 @@ static int l_connect(lua_State *L){
|
|||
lua_settop(L,0);
|
||||
|
||||
taos_init();
|
||||
|
||||
lua_newtable(L);
|
||||
int table_index = lua_gettop(L);
|
||||
|
||||
|
@ -177,6 +183,58 @@ static int l_query(lua_State *L){
|
|||
return 1;
|
||||
}
|
||||
|
||||
void async_query_callback(void *param, TAOS_RES *result, int code){
|
||||
struct async_query_callback_param* p = (struct async_query_callback_param*) param;
|
||||
|
||||
//printf("\nin c,numfields:%d\n", numFields);
|
||||
//printf("\nin c, code:%d\n", code);
|
||||
|
||||
lua_State *L = p->state;
|
||||
lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback);
|
||||
lua_newtable(L);
|
||||
int table_index = lua_gettop(L);
|
||||
if( code < 0){
|
||||
printf("failed, reason:%s\n", taos_errstr(result));
|
||||
lua_pushinteger(L, -1);
|
||||
lua_setfield(L, table_index, "code");
|
||||
lua_pushstring(L,"something is wrong");// taos_errstr(taos));
|
||||
lua_setfield(L, table_index, "error");
|
||||
}else{
|
||||
//printf("success to async query.\n");
|
||||
const int affectRows = taos_affected_rows(result);
|
||||
//printf(" affect rows:%d\r\n", affectRows);
|
||||
lua_pushinteger(L, 0);
|
||||
lua_setfield(L, table_index, "code");
|
||||
lua_pushinteger(L, affectRows);
|
||||
lua_setfield(L, table_index, "affected");
|
||||
}
|
||||
|
||||
lua_call(L, 1, 0);
|
||||
}
|
||||
|
||||
static int l_async_query(lua_State *L){
|
||||
int r = luaL_ref(L, LUA_REGISTRYINDEX);
|
||||
TAOS * taos = (TAOS*)lua_topointer(L,1);
|
||||
const char * sqlstr = lua_tostring(L,2);
|
||||
// int stime = luaL_checknumber(L,3);
|
||||
|
||||
lua_newtable(L);
|
||||
int table_index = lua_gettop(L);
|
||||
|
||||
struct async_query_callback_param *p = malloc(sizeof(struct async_query_callback_param));
|
||||
p->state = L;
|
||||
p->callback=r;
|
||||
// printf("r:%d, L:%d\n",r,L);
|
||||
taos_query_a(taos,sqlstr,async_query_callback,p);
|
||||
|
||||
lua_pushnumber(L, 0);
|
||||
lua_setfield(L, table_index, "code");
|
||||
lua_pushstring(L, "ok");
|
||||
lua_setfield(L, table_index, "error");
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){
|
||||
struct cb_param* p = (struct cb_param*) param;
|
||||
TAOS_FIELD *fields = taos_fetch_fields(result);
|
||||
|
@ -307,6 +365,7 @@ static int l_close(lua_State *L){
|
|||
static const struct luaL_Reg lib[] = {
|
||||
{"connect", l_connect},
|
||||
{"query", l_query},
|
||||
{"query_a",l_async_query},
|
||||
{"close", l_close},
|
||||
{"open_stream", l_open_stream},
|
||||
{"close_stream", l_close_stream},
|
||||
|
|
|
@ -110,7 +110,25 @@ else
|
|||
end
|
||||
end
|
||||
|
||||
function callback(t)
|
||||
function async_query_callback(res)
|
||||
if res.code ~=0 then
|
||||
print("async_query_callback--- failed:"..res.error)
|
||||
return
|
||||
else
|
||||
|
||||
if(res.affected == 3) then
|
||||
print("async_query_callback, insert records--- pass")
|
||||
else
|
||||
print("async_query_callback, insert records---failed: expect 3 affected records, actually affected "..res.affected)
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
||||
driver.query_a(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback)
|
||||
|
||||
|
||||
function stream_callback(t)
|
||||
print("------------------------")
|
||||
print("continuous query result:")
|
||||
for key, value in pairs(t) do
|
||||
|
@ -119,7 +137,7 @@ function callback(t)
|
|||
end
|
||||
|
||||
local stream
|
||||
res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0,callback)
|
||||
res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0, stream_callback)
|
||||
if res.code ~=0 then
|
||||
print("open stream--- failed:"..res.error)
|
||||
return
|
||||
|
@ -146,4 +164,5 @@ while loop_index < 30 do
|
|||
end
|
||||
|
||||
driver.close_stream(stream)
|
||||
|
||||
driver.close(conn)
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
def getBuildPath(self):
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
if ("community" in selfPath):
|
||||
projPath = selfPath[:selfPath.find("community")]
|
||||
else:
|
||||
projPath = selfPath[:selfPath.find("tests")]
|
||||
|
||||
for root, dirs, files in os.walk(projPath):
|
||||
if ("taosd" in files):
|
||||
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||
if ("packaging" not in rootRealPath):
|
||||
buildPath = root[:len(root) - len("/build/bin")]
|
||||
break
|
||||
return buildPath
|
||||
|
||||
def isLuaInstalled(self):
|
||||
if not which('lua'):
|
||||
tdLog.exit("Lua not found!")
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
# tdLog.info("Check if Lua installed")
|
||||
# if not self.isLuaInstalled():
|
||||
# sys.exit(1)
|
||||
|
||||
buildPath = self.getBuildPath()
|
||||
if (buildPath == ""):
|
||||
tdLog.exit("taosd not found!")
|
||||
else:
|
||||
tdLog.info("taosd found in %s" % buildPath)
|
||||
|
||||
targetPath = buildPath + "/../tests/examples/lua"
|
||||
tdLog.info(targetPath)
|
||||
currentPath = os.getcwd()
|
||||
os.chdir(targetPath)
|
||||
os.system('./build.sh')
|
||||
os.system('lua test.lua')
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
|
||||
#tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -35,3 +35,5 @@ python3.8 ./test.py $1 -s && sleep 1
|
|||
python3.8 ./test.py $1 -f client/client.py
|
||||
python3.8 ./test.py $1 -s && sleep 1
|
||||
|
||||
# connector
|
||||
python3.8 ./test.py $1 -f connector/lua.py
|
||||
|
|
Loading…
Reference in New Issue