Merge pull request #13040 from taosdata/feature/stream
fix(stream): double free
This commit is contained in:
commit
757e4c2d3e
|
@ -84,15 +84,16 @@ typedef struct {
|
||||||
} SStreamCheckpoint;
|
} SStreamCheckpoint;
|
||||||
|
|
||||||
static FORCE_INLINE SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) {
|
static FORCE_INLINE SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) {
|
||||||
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosMemoryCalloc(1, sizeof(SStreamDataSubmit));
|
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
|
||||||
if (pDataSubmit == NULL) return NULL;
|
if (pDataSubmit == NULL) return NULL;
|
||||||
pDataSubmit->data = pReq;
|
|
||||||
pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
|
pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
|
||||||
if (pDataSubmit->data == NULL) goto FAIL;
|
if (pDataSubmit->dataRef == NULL) goto FAIL;
|
||||||
|
pDataSubmit->data = pReq;
|
||||||
*pDataSubmit->dataRef = 1;
|
*pDataSubmit->dataRef = 1;
|
||||||
|
pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT;
|
||||||
return pDataSubmit;
|
return pDataSubmit;
|
||||||
FAIL:
|
FAIL:
|
||||||
taosMemoryFree(pDataSubmit);
|
taosFreeQitem(pDataSubmit);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,7 +108,6 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit)
|
||||||
if (ref == 0) {
|
if (ref == 0) {
|
||||||
taosMemoryFree(pDataSubmit->data);
|
taosMemoryFree(pDataSubmit->data);
|
||||||
taosMemoryFree(pDataSubmit->dataRef);
|
taosMemoryFree(pDataSubmit->dataRef);
|
||||||
taosFreeQitem(pDataSubmit);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -829,27 +829,15 @@ FAIL:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
|
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
bool failed = false;
|
bool failed = false;
|
||||||
|
SStreamDataSubmit* pSubmit = NULL;
|
||||||
|
|
||||||
SStreamDataSubmit* pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
|
pSubmit = streamDataSubmitNew(pReq);
|
||||||
if (pSubmit == NULL) {
|
if (pSubmit == NULL) {
|
||||||
failed = true;
|
failed = true;
|
||||||
goto SET_TASK_FAIL;
|
|
||||||
}
|
|
||||||
pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t));
|
|
||||||
if (pSubmit->dataRef == NULL) {
|
|
||||||
failed = true;
|
|
||||||
goto SET_TASK_FAIL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pSubmit->type = STREAM_INPUT__DATA_SUBMIT;
|
|
||||||
/*pSubmit->sourceVer = ver;*/
|
|
||||||
/*pSubmit->sourceVg = pTq->pVnode->config.vgId;*/
|
|
||||||
pSubmit->data = pReq;
|
|
||||||
*pSubmit->dataRef = 1;
|
|
||||||
|
|
||||||
SET_TASK_FAIL:
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
|
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
@ -864,7 +852,9 @@ SET_TASK_FAIL:
|
||||||
}
|
}
|
||||||
|
|
||||||
streamDataSubmitRefInc(pSubmit);
|
streamDataSubmitRefInc(pSubmit);
|
||||||
taosWriteQitem(pTask->inputQ, pSubmit);
|
SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
|
||||||
|
memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit));
|
||||||
|
taosWriteQitem(pTask->inputQ, pSubmitClone);
|
||||||
|
|
||||||
int8_t execStatus = atomic_load_8(&pTask->status);
|
int8_t execStatus = atomic_load_8(&pTask->status);
|
||||||
if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
|
if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
|
||||||
|
@ -887,18 +877,12 @@ SET_TASK_FAIL:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!failed) {
|
if (pSubmit) {
|
||||||
streamDataSubmitRefDec(pSubmit);
|
streamDataSubmitRefDec(pSubmit);
|
||||||
return 0;
|
taosFreeQitem(pSubmit);
|
||||||
} else {
|
|
||||||
if (pSubmit) {
|
|
||||||
if (pSubmit->dataRef) {
|
|
||||||
taosMemoryFree(pSubmit->dataRef);
|
|
||||||
}
|
|
||||||
taosFreeQitem(pSubmit);
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return failed ? -1 : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
|
@ -166,6 +166,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
|
||||||
// destroy
|
// destroy
|
||||||
if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
|
if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
streamDataSubmitRefDec((SStreamDataSubmit*)data);
|
streamDataSubmitRefDec((SStreamDataSubmit*)data);
|
||||||
|
taosFreeQitem(data);
|
||||||
} else {
|
} else {
|
||||||
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
|
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
|
||||||
taosFreeQitem(data);
|
taosFreeQitem(data);
|
||||||
|
@ -173,6 +174,25 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
||||||
|
while (1) {
|
||||||
|
void* data = NULL;
|
||||||
|
taosGetQitem(pTask->inputQAll, &data);
|
||||||
|
if (data == NULL) break;
|
||||||
|
|
||||||
|
streamTaskExecImpl(pTask, data, pRes);
|
||||||
|
|
||||||
|
if (taosArrayGetSize(pRes) != 0) {
|
||||||
|
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||||
|
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||||
|
qRes->blocks = pRes;
|
||||||
|
taosWriteQitem(pTask->outputQ, qRes);
|
||||||
|
return taosArrayInit(0, sizeof(SSDataBlock));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return pRes;
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: handle version
|
// TODO: handle version
|
||||||
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||||
|
@ -182,88 +202,21 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
void* exec = pTask->exec.executor;
|
void* exec = pTask->exec.executor;
|
||||||
if (execStatus == TASK_STATUS__IDLE) {
|
if (execStatus == TASK_STATUS__IDLE) {
|
||||||
// first run, from qall, handle failure from last exec
|
// first run, from qall, handle failure from last exec
|
||||||
while (1) {
|
pRes = streamExecForQall(pTask, pRes);
|
||||||
void* data = NULL;
|
if (pRes == NULL) goto FAIL;
|
||||||
taosGetQitem(pTask->inputQAll, &data);
|
|
||||||
if (data == NULL) break;
|
|
||||||
|
|
||||||
streamTaskExecImpl(pTask, data, pRes);
|
|
||||||
|
|
||||||
/*taosFreeQitem(data);*/
|
|
||||||
|
|
||||||
if (taosArrayGetSize(pRes) != 0) {
|
|
||||||
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
|
||||||
resQ->type = STREAM_INPUT__DATA_BLOCK;
|
|
||||||
resQ->blocks = pRes;
|
|
||||||
taosWriteQitem(pTask->outputQ, resQ);
|
|
||||||
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
|
||||||
if (pRes == NULL) goto FAIL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// second run, from inputQ
|
// second run, from inputQ
|
||||||
taosReadAllQitems(pTask->inputQ, pTask->inputQAll);
|
taosReadAllQitems(pTask->inputQ, pTask->inputQAll);
|
||||||
while (1) {
|
pRes = streamExecForQall(pTask, pRes);
|
||||||
void* data = NULL;
|
if (pRes == NULL) goto FAIL;
|
||||||
taosGetQitem(pTask->inputQAll, &data);
|
|
||||||
if (data == NULL) break;
|
|
||||||
|
|
||||||
streamTaskExecImpl(pTask, data, pRes);
|
|
||||||
|
|
||||||
/*taosFreeQitem(data);*/
|
|
||||||
|
|
||||||
if (taosArrayGetSize(pRes) != 0) {
|
|
||||||
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
|
||||||
resQ->type = STREAM_INPUT__DATA_BLOCK;
|
|
||||||
resQ->blocks = pRes;
|
|
||||||
taosWriteQitem(pTask->outputQ, resQ);
|
|
||||||
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
|
||||||
if (pRes == NULL) goto FAIL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// set status closing
|
// set status closing
|
||||||
atomic_store_8(&pTask->status, TASK_STATUS__CLOSING);
|
atomic_store_8(&pTask->status, TASK_STATUS__CLOSING);
|
||||||
// third run, make sure all inputQ is cleared
|
|
||||||
|
// third run, make sure inputQ and qall are cleared
|
||||||
taosReadAllQitems(pTask->inputQ, pTask->inputQAll);
|
taosReadAllQitems(pTask->inputQ, pTask->inputQAll);
|
||||||
while (1) {
|
pRes = streamExecForQall(pTask, pRes);
|
||||||
void* data = NULL;
|
if (pRes == NULL) goto FAIL;
|
||||||
taosGetQitem(pTask->inputQAll, &data);
|
|
||||||
if (data == NULL) break;
|
|
||||||
|
|
||||||
streamTaskExecImpl(pTask, data, pRes);
|
|
||||||
|
|
||||||
/*taosFreeQitem(data);*/
|
|
||||||
|
|
||||||
if (taosArrayGetSize(pRes) != 0) {
|
|
||||||
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
|
||||||
resQ->type = STREAM_INPUT__DATA_BLOCK;
|
|
||||||
resQ->blocks = pRes;
|
|
||||||
taosWriteQitem(pTask->outputQ, resQ);
|
|
||||||
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
|
||||||
if (pRes == NULL) goto FAIL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// set status closing
|
|
||||||
atomic_store_8(&pTask->status, TASK_STATUS__CLOSING);
|
|
||||||
// third run, make sure all inputQ is cleared
|
|
||||||
taosReadAllQitems(pTask->inputQ, pTask->inputQAll);
|
|
||||||
while (1) {
|
|
||||||
void* data = NULL;
|
|
||||||
taosGetQitem(pTask->inputQAll, &data);
|
|
||||||
if (data == NULL) break;
|
|
||||||
|
|
||||||
streamTaskExecImpl(pTask, data, pRes);
|
|
||||||
|
|
||||||
taosFreeQitem(data);
|
|
||||||
|
|
||||||
if (taosArrayGetSize(pRes) != 0) {
|
|
||||||
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
|
||||||
resQ->type = STREAM_INPUT__DATA_BLOCK;
|
|
||||||
resQ->blocks = pRes;
|
|
||||||
taosWriteQitem(pTask->outputQ, resQ);
|
|
||||||
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
|
||||||
if (pRes == NULL) goto FAIL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_store_8(&pTask->status, TASK_STATUS__IDLE);
|
atomic_store_8(&pTask->status, TASK_STATUS__IDLE);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -1,169 +0,0 @@
|
||||||
###################################################################
|
|
||||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
|
||||||
# All rights reserved.
|
|
||||||
#
|
|
||||||
# This file is proprietary and confidential to TAOS Technologies.
|
|
||||||
# No part of this file may be reproduced, stored, transmitted,
|
|
||||||
# disclosed or used in any form or by any means other than as
|
|
||||||
# expressly provided by the written permission from Jianhui Tao
|
|
||||||
#
|
|
||||||
###################################################################
|
|
||||||
|
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
import threading
|
|
||||||
import taos
|
|
||||||
import sys
|
|
||||||
import json
|
|
||||||
import time
|
|
||||||
import random
|
|
||||||
# query sql
|
|
||||||
query_sql = [
|
|
||||||
# first supertable
|
|
||||||
"select count(*) from test.meters ;",
|
|
||||||
"select count(*) from test.meters where t3 > 2;",
|
|
||||||
"select count(*) from test.meters where ts <> '2020-05-13 10:00:00.002';",
|
|
||||||
"select count(*) from test.meters where t7 like 'taos_1%';",
|
|
||||||
"select count(*) from test.meters where t7 like '_____2';",
|
|
||||||
"select count(*) from test.meters where t8 like '%思%';",
|
|
||||||
"select count(*) from test.meters interval(1n) order by ts desc;",
|
|
||||||
#"select max(c0) from test.meters group by tbname",
|
|
||||||
"select first(ts) from test.meters where t5 >5000 and t5<5100;",
|
|
||||||
"select last(ts) from test.meters where t5 >5000 and t5<5100;",
|
|
||||||
"select last_row(*) from test.meters;",
|
|
||||||
"select twa(c1) from test.t1 where ts > 1500000001000 and ts < 1500000101000" ,
|
|
||||||
"select avg(c1) from test.meters where t5 >5000 and t5<5100;",
|
|
||||||
"select bottom(c1, 2) from test.t1;",
|
|
||||||
"select diff(c1) from test.t1;",
|
|
||||||
"select leastsquares(c1, 1, 1) from test.t1 ;",
|
|
||||||
"select max(c1) from test.meters where t5 >5000 and t5<5100;",
|
|
||||||
"select min(c1) from test.meters where t5 >5000 and t5<5100;",
|
|
||||||
"select c1 + c2 + c1 / c5 + c4 + c2 from test.t1;",
|
|
||||||
"select percentile(c1, 50) from test.t1;",
|
|
||||||
"select spread(c1) from test.t1 ;",
|
|
||||||
"select stddev(c1) from test.t1;",
|
|
||||||
"select sum(c1) from test.meters where t5 >5000 and t5<5100;",
|
|
||||||
"select top(c1, 2) from test.meters where t5 >5000 and t5<5100;"
|
|
||||||
"select twa(c4) from test.t1 where ts > 1500000001000 and ts < 1500000101000" ,
|
|
||||||
"select avg(c4) from test.meters where t5 >5000 and t5<5100;",
|
|
||||||
"select bottom(c4, 2) from test.t1 where t5 >5000 and t5<5100;",
|
|
||||||
"select diff(c4) from test.t1 where t5 >5000 and t5<5100;",
|
|
||||||
"select leastsquares(c4, 1, 1) from test.t1 ;",
|
|
||||||
"select max(c4) from test.meters where t5 >5000 and t5<5100;",
|
|
||||||
"select min(c4) from test.meters where t5 >5000 and t5<5100;",
|
|
||||||
"select c5 + c2 + c4 / c5 + c4 + c2 from test.t1 ;",
|
|
||||||
"select percentile(c5, 50) from test.t1;",
|
|
||||||
"select spread(c5) from test.t1 ;",
|
|
||||||
"select stddev(c5) from test.t1 where t5 >5000 and t5<5100;",
|
|
||||||
"select sum(c5) from test.meters where t5 >5000 and t5<5100;",
|
|
||||||
"select top(c5, 2) from test.meters where t5 >5000 and t5<5100;",
|
|
||||||
#all vnode
|
|
||||||
"select count(*) from test.meters where t5 >5000 and t5<5100",
|
|
||||||
"select max(c0),avg(c1) from test.meters where t5 >5000 and t5<5100",
|
|
||||||
"select sum(c5),avg(c1) from test.meters where t5 >5000 and t5<5100",
|
|
||||||
"select max(c0),min(c5) from test.meters where t5 >5000 and t5<5100",
|
|
||||||
"select min(c0),avg(c5) from test.meters where t5 >5000 and t5<5100",
|
|
||||||
# second supertable
|
|
||||||
"select count(*) from test.meters1 where t3 > 2;",
|
|
||||||
"select count(*) from test.meters1 where ts <> '2020-05-13 10:00:00.002';",
|
|
||||||
"select count(*) from test.meters where t7 like 'taos_1%';",
|
|
||||||
"select count(*) from test.meters where t7 like '_____2';",
|
|
||||||
"select count(*) from test.meters where t8 like '%思%';",
|
|
||||||
"select count(*) from test.meters1 interval(1n) order by ts desc;",
|
|
||||||
#"select max(c0) from test.meters1 group by tbname",
|
|
||||||
"select first(ts) from test.meters1 where t5 >5000 and t5<5100;",
|
|
||||||
"select last(ts) from test.meters1 where t5 >5000 and t5<5100;",
|
|
||||||
"select last_row(*) from test.meters1 ;",
|
|
||||||
"select twa(c1) from test.m1 where ts > 1500000001000 and ts < 1500000101000" ,
|
|
||||||
"select avg(c1) from test.meters1 where t5 >5000 and t5<5100;",
|
|
||||||
"select bottom(c1, 2) from test.m1 where t5 >5000 and t5<5100;",
|
|
||||||
"select diff(c1) from test.m1 ;",
|
|
||||||
"select leastsquares(c1, 1, 1) from test.m1 ;",
|
|
||||||
"select max(c1) from test.meters1 where t5 >5000 and t5<5100;",
|
|
||||||
"select min(c1) from test.meters1 where t5 >5000 and t5<5100;",
|
|
||||||
"select c1 + c2 + c1 / c0 + c2 from test.m1 ;",
|
|
||||||
"select percentile(c1, 50) from test.m1;",
|
|
||||||
"select spread(c1) from test.m1 ;",
|
|
||||||
"select stddev(c1) from test.m1;",
|
|
||||||
"select sum(c1) from test.meters1 where t5 >5000 and t5<5100;",
|
|
||||||
"select top(c1, 2) from test.meters1 where t5 >5000 and t5<5100;",
|
|
||||||
"select twa(c5) from test.m1 where ts > 1500000001000 and ts < 1500000101000" ,
|
|
||||||
"select avg(c5) from test.meters1 where t5 >5000 and t5<5100;",
|
|
||||||
"select bottom(c5, 2) from test.m1;",
|
|
||||||
"select diff(c5) from test.m1;",
|
|
||||||
"select leastsquares(c5, 1, 1) from test.m1 ;",
|
|
||||||
"select max(c5) from test.meters1 where t5 >5000 and t5<5100;",
|
|
||||||
"select min(c5) from test.meters1 where t5 >5000 and t5<5100;",
|
|
||||||
"select c5 + c2 + c4 / c5 + c0 from test.m1;",
|
|
||||||
"select percentile(c4, 50) from test.m1;",
|
|
||||||
"select spread(c4) from test.m1 ;",
|
|
||||||
"select stddev(c4) from test.m1;",
|
|
||||||
"select sum(c4) from test.meters1 where t5 >5100 and t5<5300;",
|
|
||||||
"select top(c4, 2) from test.meters1 where t5 >5100 and t5<5300;",
|
|
||||||
"select count(*) from test.meters1 where t5 >5100 and t5<5300",
|
|
||||||
#all vnode
|
|
||||||
"select count(*) from test.meters1 where t5 >5100 and t5<5300",
|
|
||||||
"select max(c0),avg(c1) from test.meters1 where t5 >5000 and t5<5100",
|
|
||||||
"select sum(c5),avg(c1) from test.meters1 where t5 >5000 and t5<5100",
|
|
||||||
"select max(c0),min(c5) from test.meters1 where t5 >5000 and t5<5100",
|
|
||||||
"select min(c0),avg(c5) from test.meters1 where t5 >5000 and t5<5100",
|
|
||||||
#join
|
|
||||||
# "select * from meters,meters1 where meters.ts = meters1.ts and meters.t5 = meters1.t5",
|
|
||||||
# "select * from meters,meters1 where meters.ts = meters1.ts and meters.t7 = meters1.t7",
|
|
||||||
# "select * from meters,meters1 where meters.ts = meters1.ts and meters.t8 = meters1.t8",
|
|
||||||
# "select meters.ts,meters1.c2 from meters,meters1 where meters.ts = meters1.ts and meters.t8 = meters1.t8"
|
|
||||||
]
|
|
||||||
|
|
||||||
class ConcurrentInquiry:
|
|
||||||
def initConnection(self):
|
|
||||||
self.numOfTherads = 50
|
|
||||||
self.ts=1500000001000
|
|
||||||
|
|
||||||
def SetThreadsNum(self,num):
|
|
||||||
self.numOfTherads=num
|
|
||||||
def query_thread(self,threadID):
|
|
||||||
host = "10.211.55.14"
|
|
||||||
user = "root"
|
|
||||||
password = "taosdata"
|
|
||||||
conn = taos.connect(
|
|
||||||
host,
|
|
||||||
user,
|
|
||||||
password,
|
|
||||||
)
|
|
||||||
cl = conn.cursor()
|
|
||||||
cl.execute("use test;")
|
|
||||||
|
|
||||||
print("Thread %d: starting" % threadID)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
ran_query_sql=query_sql
|
|
||||||
random.shuffle(ran_query_sql)
|
|
||||||
for i in ran_query_sql:
|
|
||||||
print("Thread %d : %s"% (threadID,i))
|
|
||||||
try:
|
|
||||||
start = time.time()
|
|
||||||
cl.execute(i)
|
|
||||||
cl.fetchall()
|
|
||||||
end = time.time()
|
|
||||||
print("time cost :",end-start)
|
|
||||||
except Exception as e:
|
|
||||||
print(
|
|
||||||
"Failure thread%d, sql: %s,exception: %s" %
|
|
||||||
(threadID, str(i),str(e)))
|
|
||||||
exit(-1)
|
|
||||||
|
|
||||||
|
|
||||||
print("Thread %d: finishing" % threadID)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
|
|
||||||
threads = []
|
|
||||||
for i in range(self.numOfTherads):
|
|
||||||
thread = threading.Thread(target=self.query_thread, args=(i,))
|
|
||||||
threads.append(thread)
|
|
||||||
thread.start()
|
|
||||||
|
|
||||||
q = ConcurrentInquiry()
|
|
||||||
q.initConnection()
|
|
||||||
q.run()
|
|
|
@ -0,0 +1,93 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import sys
|
||||||
|
from util.log import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.common import tdCom
|
||||||
|
class TDTestCase:
|
||||||
|
def init(self, conn, logSql):
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor(), logSql)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
#for i in range(100):
|
||||||
|
tdSql.prepare()
|
||||||
|
dbname = tdCom.getLongName(10, "letters")
|
||||||
|
tdSql.execute('show databases')
|
||||||
|
tdSql.execute('drop database if exists ttxkbrzmpo')
|
||||||
|
tdSql.execute('create database if not exists ttxkbrzmpo vgroups 1')
|
||||||
|
tdSql.execute('use ttxkbrzmpo')
|
||||||
|
tdSql.execute('create table if not exists downsampling_stb (ts timestamp, c1 int, c2 double, c3 varchar(100), c4 bool) tags (t1 int, t2 double, t3 varchar(100), t4 bool);')
|
||||||
|
tdSql.execute('create table downsampling_ct1 using downsampling_stb tags(10, 10.1, "Beijing", True);')
|
||||||
|
tdSql.execute('create table if not exists scalar_stb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 nchar(20), c5 nchar(20)) tags (t1 int);')
|
||||||
|
tdSql.execute('create table scalar_ct1 using scalar_stb tags(10);')
|
||||||
|
tdSql.execute('create stream downsampling_stream into output_downsampling_stb as select _wstartts AS start, min(c1), max(c2), sum(c1) from downsampling_stb interval(10m);')
|
||||||
|
tdSql.execute('insert into downsampling_ct1 values (1653547828591, 100, 100.1, "Beijing", True);')
|
||||||
|
tdSql.execute('insert into downsampling_ct1 values (1653547828591+1s, -100, -100.1, "Tianjin", False);')
|
||||||
|
tdSql.execute('insert into downsampling_ct1 values (1653547828591+2s, 50, 50.3, "HeBei", False);')
|
||||||
|
tdSql.execute('select * from output_downsampling_stb;')
|
||||||
|
tdSql.execute('select start, `min(c1)`, `max(c2)`, `sum(c1)` from output_downsampling_stb;')
|
||||||
|
tdSql.execute('select _wstartts AS start, min(c1), max(c2), sum(c1) from downsampling_stb interval(10m);')
|
||||||
|
tdSql.execute('insert into downsampling_ct1 values (1653547828591+10m, 60, 60.3, "heilongjiang", True);')
|
||||||
|
tdSql.execute('insert into downsampling_ct1 values (1653547828591+11m, 70, 70.3, "JiLin", True);')
|
||||||
|
tdSql.execute('select * from output_downsampling_stb;')
|
||||||
|
tdSql.execute('select start, `min(c1)`, `max(c2)`, `sum(c1)` from output_downsampling_stb;')
|
||||||
|
tdSql.execute('select _wstartts AS start, min(c1), max(c2), sum(c1) from downsampling_stb interval(10m);')
|
||||||
|
tdSql.execute('insert into downsampling_ct1 values (1653547828591+21m, 70, 70.3, "JiLin", True);')
|
||||||
|
tdSql.execute('select * from output_downsampling_stb;')
|
||||||
|
tdSql.execute('select * from output_downsampling_stb;')
|
||||||
|
tdSql.execute('select start, `min(c1)`, `max(c2)`, `sum(c1)` from output_downsampling_stb;')
|
||||||
|
tdSql.execute('select _wstartts AS start, min(c1), max(c2), sum(c1) from downsampling_stb interval(10m);')
|
||||||
|
tdSql.execute('create stream abs_stream into output_abs_stb as select ts, abs(c1), abs(c2), c3 from scalar_stb;')
|
||||||
|
tdSql.query('describe output_abs_stb')
|
||||||
|
tdSql.execute('create stream acos_stream into output_acos_stb as select ts, acos(c1), acos(c2), c3 from scalar_stb;')
|
||||||
|
tdSql.query('describe output_acos_stb')
|
||||||
|
tdSql.execute('create stream asin_stream into output_asin_stb as select ts, asin(c1), asin(c2), c3 from scalar_stb;')
|
||||||
|
tdSql.query('describe output_asin_stb')
|
||||||
|
tdSql.execute('create stream atan_stream into output_atan_stb as select ts, atan(c1), atan(c2), c3 from scalar_stb;')
|
||||||
|
tdSql.query('describe output_atan_stb')
|
||||||
|
tdSql.execute('create stream ceil_stream into output_ceil_stb as select ts, ceil(c1), ceil(c2), c3 from scalar_stb;')
|
||||||
|
tdSql.query('describe output_ceil_stb')
|
||||||
|
tdSql.execute('create stream cos_stream into output_cos_stb as select ts, cos(c1), cos(c2), c3 from scalar_stb;')
|
||||||
|
tdSql.query('describe output_cos_stb')
|
||||||
|
tdSql.execute('create stream floor_stream into output_floor_stb as select ts, floor(c1), floor(c2), c3 from scalar_stb;')
|
||||||
|
tdSql.query('describe output_floor_stb')
|
||||||
|
tdSql.execute('create stream log_stream into output_log_stb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_stb;')
|
||||||
|
tdSql.query('describe output_log_stb')
|
||||||
|
tdSql.execute('create stream pow_stream into output_pow_stb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_stb;')
|
||||||
|
tdSql.query('describe output_pow_stb')
|
||||||
|
tdSql.execute('create stream round_stream into output_round_stb as select ts, round(c1), round(c2), c3 from scalar_stb;')
|
||||||
|
tdSql.query('describe output_round_stb')
|
||||||
|
tdSql.execute('create stream sin_stream into output_sin_stb as select ts, sin(c1), sin(c2), c3 from scalar_stb;')
|
||||||
|
tdSql.query('describe output_sin_stb')
|
||||||
|
tdSql.execute('create stream sqrt_stream into output_sqrt_stb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_stb;')
|
||||||
|
tdSql.query('describe output_sqrt_stb')
|
||||||
|
tdSql.execute('create stream tan_stream into output_tan_stb as select ts, tan(c1), tan(c2), c3 from scalar_stb;')
|
||||||
|
tdSql.query('describe output_tan_stb')
|
||||||
|
tdSql.execute('create stream char_length_stream into output_char_length_stb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_stb;')
|
||||||
|
tdSql.query('describe output_char_length_stb')
|
||||||
|
tdSql.execute('create stream concat_stream into output_concat_stb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_stb;')
|
||||||
|
tdSql.execute('create stream concat_ws_stream into output_concat_ws_stb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_stb;')
|
||||||
|
tdSql.execute('create stream length_stream into output_length_stb as select ts, length(c3), length(c4), length(c5) from scalar_stb;')
|
||||||
|
tdSql.query('describe output_length_stb')
|
||||||
|
tdSql.execute('create stream lower_stream into output_lower_stb as select ts, lower(c3), lower(c4), lower(c5) from scalar_stb;')
|
||||||
|
tdSql.query('describe output_lower_stb')
|
||||||
|
tdSql.execute('create stream ltrim_stream into output_ltrim_stb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_stb;')
|
||||||
|
tdSql.query('describe output_ltrim_stb')
|
||||||
|
tdSql.execute('create stream rtrim_stream into output_rtrim_stb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_stb;')
|
||||||
|
tdSql.query('describe output_rtrim_stb')
|
||||||
|
tdSql.execute('create stream substr_stream into output_substr_stb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_stb;')
|
||||||
|
tdSql.query('describe output_substr_stb')
|
||||||
|
tdSql.execute('create stream upper_stream into output_upper_stb as select ts, upper(c3), upper(c4), upper(c5) from scalar_stb;')
|
||||||
|
tdSql.query('describe output_upper_stb')
|
||||||
|
tdSql.execute('insert into scalar_ct1 values (1653560440733, 100, 100.1, "beijing", "taos", "Taos");')
|
||||||
|
tdSql.execute('insert into scalar_ct1 values (1653560440733+1s, -50, -50.1, "tianjin", "taosdata", "Taosdata");')
|
||||||
|
tdSql.execute('insert into scalar_ct1 values (1653560440733+2s, 0, Null, "hebei", "TDengine", Null);')
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue