fix several bug in stream
This commit is contained in:
parent
9ab3b63a10
commit
c096da449e
|
@ -121,7 +121,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
|
||||||
pQueryInfo->window.ekey = pStream->etime;
|
pQueryInfo->window.ekey = pStream->etime;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pQueryInfo->window.skey = pStream->stime;// - pStream->interval;
|
pQueryInfo->window.skey = pStream->stime - pStream->interval;
|
||||||
int64_t etime = taosGetTimestamp(pStream->precision);
|
int64_t etime = taosGetTimestamp(pStream->precision);
|
||||||
// delay to wait all data in last time window
|
// delay to wait all data in last time window
|
||||||
if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
|
if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||||
|
@ -150,7 +150,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
|
||||||
SSqlStream *pStream = (SSqlStream *)param;
|
SSqlStream *pStream = (SSqlStream *)param;
|
||||||
if (tres == NULL || numOfRows < 0) {
|
if (tres == NULL || numOfRows < 0) {
|
||||||
int64_t retryDelay = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
|
int64_t retryDelay = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
|
||||||
tscError("%p stream:%p, query data failed, code:%d, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows,
|
tscError("%p stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows,
|
||||||
retryDelay);
|
retryDelay);
|
||||||
|
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
|
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
|
||||||
|
@ -211,7 +211,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
|
||||||
|
|
||||||
if (pSql == NULL || numOfRows < 0) {
|
if (pSql == NULL || numOfRows < 0) {
|
||||||
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
|
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
|
||||||
tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
|
tscError("%p stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
|
||||||
|
|
||||||
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
|
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
|
||||||
return;
|
return;
|
||||||
|
@ -240,7 +240,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
|
||||||
/* no resuls in the query range, retry */
|
/* no resuls in the query range, retry */
|
||||||
// todo set retry dynamic time
|
// todo set retry dynamic time
|
||||||
int32_t retry = tsProjectExecInterval;
|
int32_t retry = tsProjectExecInterval;
|
||||||
tscError("%p stream:%p, retrieve no data, code:%d, retry in %" PRId32 "ms", pSql, pStream, numOfRows, retry);
|
tscError("%p stream:%p, retrieve no data, code:0x%08x, retry in %" PRId32 "ms", pSql, pStream, numOfRows, retry);
|
||||||
|
|
||||||
tscSetRetryTimer(pStream, pStream->pSql, retry);
|
tscSetRetryTimer(pStream, pStream->pSql, retry);
|
||||||
return;
|
return;
|
||||||
|
@ -487,7 +487,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
|
||||||
|
|
||||||
SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream));
|
SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream));
|
||||||
if (pStream == NULL) {
|
if (pStream == NULL) {
|
||||||
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
|
tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, sqlstr, pCmd->payload, pRes->code);
|
||||||
tscFreeSqlObj(pSql);
|
tscFreeSqlObj(pSql);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -512,7 +512,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
|
||||||
if (pRes->code != TSDB_CODE_SUCCESS) {
|
if (pRes->code != TSDB_CODE_SUCCESS) {
|
||||||
setErrorInfo(pSql, pRes->code, pCmd->payload);
|
setErrorInfo(pSql, pRes->code, pCmd->payload);
|
||||||
|
|
||||||
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
|
tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, sqlstr, pCmd->payload, pRes->code);
|
||||||
tscFreeSqlObj(pSql);
|
tscFreeSqlObj(pSql);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -564,6 +564,8 @@ void taos_close_stream(TAOS_STREAM *handle) {
|
||||||
taosTmrStopA(&(pStream->pTimer));
|
taosTmrStopA(&(pStream->pTimer));
|
||||||
|
|
||||||
tscDebug("%p stream:%p is closed", pSql, pStream);
|
tscDebug("%p stream:%p is closed", pSql, pStream);
|
||||||
|
// notify CQ to release the pStream object
|
||||||
|
pStream->fp(pStream->param, NULL, NULL);
|
||||||
|
|
||||||
tscFreeSqlObj(pSql);
|
tscFreeSqlObj(pSql);
|
||||||
pStream->pSql = NULL;
|
pStream->pSql = NULL;
|
||||||
|
|
|
@ -244,6 +244,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
||||||
|
|
||||||
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
|
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
|
||||||
SCqObj *pObj = (SCqObj *)param;
|
SCqObj *pObj = (SCqObj *)param;
|
||||||
|
if (tres == NULL && row == NULL) {
|
||||||
|
pObj->pStream = NULL;
|
||||||
|
return;
|
||||||
|
}
|
||||||
SCqContext *pContext = pObj->pContext;
|
SCqContext *pContext = pObj->pContext;
|
||||||
STSchema *pSchema = pObj->pSchema;
|
STSchema *pSchema = pObj->pSchema;
|
||||||
if (pObj->pStream == NULL) return;
|
if (pObj->pStream == NULL) return;
|
||||||
|
|
|
@ -5822,7 +5822,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
|
||||||
qDebug("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey,
|
qDebug("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey,
|
||||||
pQuery->window.ekey, pQuery->order.order);
|
pQuery->window.ekey, pQuery->order.order);
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
|
pQInfo->tableqinfoGroupInfo.numOfTables = 0;
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,104 @@
|
||||||
|
###################################################################
|
||||||
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
# No part of this file may be reproduced, stored, transmitted,
|
||||||
|
# disclosed or used in any form or by any means other than as
|
||||||
|
# expressly provided by the written permission from Jianhui Tao
|
||||||
|
#
|
||||||
|
###################################################################
|
||||||
|
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import taos
|
||||||
|
from util.log import tdLog
|
||||||
|
from util.cases import tdCases
|
||||||
|
from util.sql import tdSql
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def init(self, conn, logSql):
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor(), logSql)
|
||||||
|
|
||||||
|
def createFuncStream(self, expr, suffix, value):
|
||||||
|
tbname = "strm_" + suffix
|
||||||
|
tdLog.info("create stream table %s" % tbname)
|
||||||
|
tdSql.query("select %s from stb interval(1d)" % expr)
|
||||||
|
tdSql.checkData(0, 1, value)
|
||||||
|
tdSql.execute("create table %s as select %s from stb interval(1d)" % (tbname, expr))
|
||||||
|
|
||||||
|
def checkStreamData(self, suffix, value):
|
||||||
|
sql = "select * from strm_" + suffix
|
||||||
|
tdSql.waitedQuery(sql, 1, 120)
|
||||||
|
tdSql.checkData(0, 1, value)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
tbNum = 10
|
||||||
|
rowNum = 20
|
||||||
|
|
||||||
|
tdSql.prepare()
|
||||||
|
|
||||||
|
tdLog.info("===== preparing data =====")
|
||||||
|
tdSql.execute(
|
||||||
|
"create table stb(ts timestamp, tbcol int, tbcol2 float) tags(tgcol int)")
|
||||||
|
for i in range(tbNum):
|
||||||
|
tdSql.execute("create table tb%d using stb tags(%d)" % (i, i))
|
||||||
|
for j in range(rowNum):
|
||||||
|
tdSql.execute(
|
||||||
|
"insert into tb%d values (now - %dm, %d, %d)" %
|
||||||
|
(i, 1440 - j, j, j))
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
self.createFuncStream("count(*)", "c1", 200)
|
||||||
|
self.createFuncStream("count(tbcol)", "c2", 200)
|
||||||
|
self.createFuncStream("count(tbcol2)", "c3", 200)
|
||||||
|
self.createFuncStream("avg(tbcol)", "av", 9.5)
|
||||||
|
self.createFuncStream("sum(tbcol)", "su", 1900)
|
||||||
|
self.createFuncStream("min(tbcol)", "mi", 0)
|
||||||
|
self.createFuncStream("max(tbcol)", "ma", 19)
|
||||||
|
self.createFuncStream("first(tbcol)", "fi", 0)
|
||||||
|
self.createFuncStream("last(tbcol)", "la", 19)
|
||||||
|
#tdSql.query("select stddev(tbcol) from stb interval(1d)")
|
||||||
|
#tdSql.query("select leastsquares(tbcol, 1, 1) from stb interval(1d)")
|
||||||
|
tdSql.query("select top(tbcol, 1) from stb interval(1d)")
|
||||||
|
tdSql.query("select bottom(tbcol, 1) from stb interval(1d)")
|
||||||
|
#tdSql.query("select percentile(tbcol, 1) from stb interval(1d)")
|
||||||
|
#tdSql.query("select diff(tbcol) from stb interval(1d)")
|
||||||
|
|
||||||
|
tdSql.query("select count(tbcol) from stb where ts < now + 4m interval(1d)")
|
||||||
|
tdSql.checkData(0, 1, 200)
|
||||||
|
#tdSql.execute("create table strm_wh as select count(tbcol) from stb where ts < now + 4m interval(1d)")
|
||||||
|
|
||||||
|
self.createFuncStream("count(tbcol)", "as", 200)
|
||||||
|
|
||||||
|
tdSql.query("select count(tbcol) from stb interval(1d) group by tgcol")
|
||||||
|
tdSql.checkData(0, 1, 20)
|
||||||
|
|
||||||
|
tdSql.query("select count(tbcol) from stb where ts < now + 4m interval(1d) group by tgcol")
|
||||||
|
tdSql.checkData(0, 1, 20)
|
||||||
|
|
||||||
|
self.checkStreamData("c1", 200)
|
||||||
|
self.checkStreamData("c2", 200)
|
||||||
|
self.checkStreamData("c3", 200)
|
||||||
|
self.checkStreamData("av", 9.5)
|
||||||
|
self.checkStreamData("su", 1900)
|
||||||
|
self.checkStreamData("mi", 0)
|
||||||
|
self.checkStreamData("ma", 19)
|
||||||
|
self.checkStreamData("fi", 0)
|
||||||
|
self.checkStreamData("la", 19)
|
||||||
|
#self.checkStreamData("wh", 200)
|
||||||
|
self.checkStreamData("as", 200)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
|
||||||
|
|
|
@ -148,23 +148,23 @@ class TDTestCase:
|
||||||
def datatypes(self):
|
def datatypes(self):
|
||||||
tdLog.debug("begin data types")
|
tdLog.debug("begin data types")
|
||||||
tdSql.prepare()
|
tdSql.prepare()
|
||||||
tdSql.execute("create table stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(15), c6 nchar(15), c7 bool) tags(t1 int, t2 binary(15))")
|
tdSql.execute("create table stb3 (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(15), c6 nchar(15), c7 bool) tags(t1 int, t2 binary(15))")
|
||||||
tdSql.execute("create table tb0 using stb tags(0, 'tb0')")
|
tdSql.execute("create table tb0 using stb3 tags(0, 'tb0')")
|
||||||
tdSql.execute("create table tb1 using stb tags(1, 'tb1')")
|
tdSql.execute("create table tb1 using stb3 tags(1, 'tb1')")
|
||||||
tdSql.execute("create table tb2 using stb tags(2, 'tb2')")
|
tdSql.execute("create table tb2 using stb3 tags(2, 'tb2')")
|
||||||
tdSql.execute("create table tb3 using stb tags(3, 'tb3')")
|
tdSql.execute("create table tb3 using stb3 tags(3, 'tb3')")
|
||||||
tdSql.execute("create table tb4 using stb tags(4, 'tb4')")
|
tdSql.execute("create table tb4 using stb3 tags(4, 'tb4')")
|
||||||
|
|
||||||
tdSql.execute("create table strm0 as select count(ts), count(c1), max(c2), min(c4), first(c5), last(c6) from stb where ts < now + 30s interval(4s) sliding(2s)")
|
tdSql.execute("create table strm0 as select count(ts), count(c1), max(c2), min(c4), first(c5), last(c6) from stb3 where ts < now + 30s interval(4s) sliding(2s)")
|
||||||
#tdSql.execute("create table strm0 as select count(ts), count(c1), max(c2), min(c4), first(c5) from stb where ts < now + 30s interval(4s) sliding(2s)")
|
#tdSql.execute("create table strm0 as select count(ts), count(c1), max(c2), min(c4), first(c5) from stb where ts < now + 30s interval(4s) sliding(2s)")
|
||||||
tdLog.sleep(1)
|
tdLog.sleep(1)
|
||||||
tdSql.execute("insert into tb0 values (now, 0, 0, 0, 0, 'binary0', '涛思0', true) tb1 values (now, 1, 1, 1, 1, 'binary1', '涛思1', false) tb2 values (now, 2, 2, 2, 2, 'binary2', '涛思2', true) tb3 values (now, 3, 3, 3, 3, 'binary3', '涛思3', false) tb4 values (now, 4, 4, 4, 4, 'binary4', '涛思4', true) ")
|
tdSql.execute("insert into tb0 values (now, 0, 0, 0, 0, 'binary0', '涛思0', true) tb1 values (now, 1, 1, 1, 1, 'binary1', '涛思1', false) tb2 values (now, 2, 2, 2, 2, 'binary2', '涛思2', true) tb3 values (now, 3, 3, 3, 3, 'binary3', '涛思3', false) tb4 values (now, 4, 4, 4, 4, 'binary4', '涛思4', true) ")
|
||||||
|
|
||||||
tdSql.waitedQuery("select * from strm0 order by ts desc", 2, 60)
|
tdSql.waitedQuery("select * from strm0 order by ts desc", 2, 120)
|
||||||
tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
|
|
||||||
tdSql.execute("insert into tb0 values (now, 10, 10, 10, 10, 'binary0', '涛思0', true) tb1 values (now, 11, 11, 11, 11, 'binary1', '涛思1', false) tb2 values (now, 12, 12, 12, 12, 'binary2', '涛思2', true) tb3 values (now, 13, 13, 13, 13, 'binary3', '涛思3', false) tb4 values (now, 14, 14, 14, 14, 'binary4', '涛思4', true) ")
|
tdSql.execute("insert into tb0 values (now, 10, 10, 10, 10, 'binary0', '涛思0', true) tb1 values (now, 11, 11, 11, 11, 'binary1', '涛思1', false) tb2 values (now, 12, 12, 12, 12, 'binary2', '涛思2', true) tb3 values (now, 13, 13, 13, 13, 'binary3', '涛思3', false) tb4 values (now, 14, 14, 14, 14, 'binary4', '涛思4', true) ")
|
||||||
tdSql.waitedQuery("select * from strm0 order by ts desc", 4, 60)
|
tdSql.waitedQuery("select * from strm0 order by ts desc", 4, 120)
|
||||||
tdSql.checkRows(4)
|
tdSql.checkRows(4)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
|
Loading…
Reference in New Issue