Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_last

This commit is contained in:
Hongze Cheng 2022-08-02 05:37:33 +00:00
commit 418166cd92
21 changed files with 562 additions and 121 deletions

View File

@ -40,6 +40,7 @@ def pre_test(){
git reset --hard
cd ${WKC}
git reset --hard
git clean -fxd
'''
script {
if (env.CHANGE_TARGET == 'master') {

View File

@ -2,7 +2,7 @@
# taosws-rs
ExternalProject_Add(taosws-rs
GIT_REPOSITORY https://github.com/taosdata/taosws-rs.git
GIT_TAG 9de599d
GIT_TAG 24b199e
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -37,7 +37,7 @@ TDengine的主要功能如下
- **云原生**通过原生分布式的设计充分利用云平台的优势TDengine 提供了水平扩展能力具备弹性、韧性和可观测性支持k8s部署可运行在公有云、私有云和混合云上。
- **极简时序数据平台**TDengine 内建消息队列、缓存、流式计算等功能,应用无需再集成 Kafka/Redis/HBase/Spark 等软件,大幅降系统的复杂度,降低应用开发和运营维护成本。
- **极简时序数据平台**TDengine 内建消息队列、缓存、流式计算等功能,应用无需再集成 Kafka/Redis/HBase/Spark 等软件,大幅降系统的复杂度,降低应用开发和运营成本。
- **分析能力**:支持 SQL同时为时序数据特有的分析提供SQL扩展。通过超级表、存储计算分离、分区分片、预计算、自定义函数等技术TDengine 具备强大的分析能力。

View File

@ -184,6 +184,7 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
int32_t getJsonValueLen(const char* data);
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
const SColumnInfoData* pSource, int32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,

View File

@ -28,10 +28,10 @@ extern bool gRaftDetailLog;
#define SYNC_RESP_TTL_MS 10000000
#define SYNC_MAX_BATCH_SIZE 500
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
#define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
typedef enum {
SYNC_STRATEGY_NO_SNAPSHOT = 0,

View File

@ -118,6 +118,76 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
return 0;
}
int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
return TSDB_CODE_SUCCESS;
}
if (pColumnInfoData->varmeta.allocLen >= newSize) {
return TSDB_CODE_SUCCESS;
}
if (pColumnInfoData->varmeta.allocLen < newSize) {
char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pColumnInfoData->pData = buf;
pColumnInfoData->varmeta.allocLen = newSize;
}
return TSDB_CODE_SUCCESS;
}
static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData, int32_t itemLen, int32_t numOfRows) {
ASSERT(pColumnInfoData->info.bytes >= itemLen);
size_t start = 1;
// the first item
memcpy(pColumnInfoData->pData, pData, itemLen);
int32_t t = 0;
int32_t count = log(numOfRows)/log(2);
while(t < count) {
int32_t xlen = 1 << t;
memcpy(pColumnInfoData->pData + start * itemLen + pColumnInfoData->varmeta.length, pColumnInfoData->pData, xlen * itemLen);
t += 1;
start += xlen;
}
// the tail part
if (numOfRows > start) {
memcpy(pColumnInfoData->pData + start * itemLen + currentRow * itemLen, pColumnInfoData->pData, (numOfRows - start) * itemLen);
}
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
for(int32_t i = 0; i < numOfRows; ++i) {
pColumnInfoData->varmeta.offset[i + currentRow] = pColumnInfoData->varmeta.length + i * itemLen;
}
pColumnInfoData->varmeta.length += numOfRows * itemLen;
}
}
int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows) {
ASSERT(pData != NULL && pColumnInfoData != NULL);
int32_t len = pColumnInfoData->info.bytes;
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
len = varDataTLen(pData);
if (pColumnInfoData->varmeta.allocLen < (numOfRows + currentRow) * len) {
int32_t code = colDataReserve(pColumnInfoData, (numOfRows + currentRow) * len);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows);
return TSDB_CODE_SUCCESS;
}
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
int32_t numOfRow2) {
if (numOfRow2 <= 0) return;
@ -1113,15 +1183,12 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo*
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
pColumn->varmeta.length = 0;
if (pColumn->varmeta.offset > 0) {
if (pColumn->varmeta.offset != NULL) {
memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows);
}
} else {
if (pColumn->nullbitmap != NULL) {
memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
if (pColumn->pData != NULL) {
memset(pColumn->pData, 0, pColumn->info.bytes * numOfRows);
}
}
}
}
@ -1725,6 +1792,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
}
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
memset(pBuf, 0, sizeof(pBuf));
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf);
if (len >= size - 1) return dumpBuf;
@ -1753,6 +1821,26 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
if (len >= size - 1) return dumpBuf;
break;
case TSDB_DATA_TYPE_BOOL:
len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var);
if (len >= size - 1) return dumpBuf;
break;
case TSDB_DATA_TYPE_VARCHAR: {
memset(pBuf, 0, sizeof(pBuf));
char* pData = colDataGetVarData(pColInfoData, j);
int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
memcpy(pBuf, varDataVal(pData), dataSize);
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
if (len >= size - 1) return dumpBuf;
} break;
case TSDB_DATA_TYPE_NCHAR: {
char* pData = colDataGetVarData(pColInfoData, j);
int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
memset(pBuf, 0, sizeof(pBuf));
taosUcs4ToMbs((TdUcs4 *)varDataVal(pData), dataSize, pBuf);
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
if (len >= size - 1) return dumpBuf;
} break;
}
}
len += snprintf(dumpBuf + len, size - len, "\n");

View File

@ -758,7 +758,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
pReader->cost.blockLoadTime += elapsedTime;
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);

View File

@ -695,7 +695,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
SSyncInfo syncInfo = {
.snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
//.snapshotStrategy = SYNC_STRATEGY_NO_SNAPSHOT,
.batchSize = 10,
.batchSize = 1,
.vgId = pVnode->config.vgId,
.isStandBy = pVnode->config.standby,
.syncCfg = pVnode->config.syncCfg,

View File

@ -917,9 +917,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
SNode* pCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);

View File

@ -571,6 +571,10 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
}
*numOfExprs = numOfFuncs + numOfGroupKeys;
if (*numOfExprs == 0) {
return NULL;
}
SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
for (int32_t i = 0; i < (*numOfExprs); ++i) {

View File

@ -4171,16 +4171,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
.calTrigger = pSessionNode->window.triggerType};
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
pPhyNode->pConditions, pTaskInfo);
pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {

View File

@ -405,9 +405,15 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
data = (char*)p;
}
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, data,
(data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
if (isNullVal) {
colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows);
} else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows);
} else { // todo opt for json tag
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, data, false);
}
}
if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL &&

View File

@ -152,7 +152,7 @@ SSDataBlock* loadNextDataBlock(void* param) {
void applyScalarFunction(SSDataBlock* pBlock, void* param) {
SOperatorInfo* pOperator = param;
SSortOperatorInfo* pSort = pOperator->info;
if (pOperator->exprSupp.pExprInfo != NULL) {
if (pOperator->exprSupp.pExprInfo != NULL && pOperator->exprSupp.numOfExprs > 0) {
int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx,
pOperator->exprSupp.numOfExprs, NULL);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -2439,12 +2439,14 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupAggSup(&pInfo->aggSup);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosMemoryFreeClear(param);
}
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
STimeWindowAggSupp* pTwAggSupp, SNode* pCondition,
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
SExecTaskInfo* pTaskInfo) {
SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
@ -2455,6 +2457,10 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -2462,16 +2468,19 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
initBasicInfo(&pInfo->binfo, pResBlock);
pInfo->twAggSup = *pTwAggSupp;
pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
pInfo->gap = pSessionNode->gap;
initResultRowInfo(&pInfo->binfo.resultRowInfo);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->tsSlotId = tsSlotId;
pInfo->gap = gap;
pInfo->binfo.pRes = pResBlock;
pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pInfo->binfo.pRes = pResBlock;
pInfo->winSup.prevTs = INT64_MIN;
pInfo->reptScan = false;
pInfo->pCondition = pCondition;
pInfo->reptScan = false;
pInfo->pCondition = pSessionNode->window.node.pConditions;
pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
pOperator->blocking = true;

View File

@ -81,10 +81,8 @@ static int32_t addNamespace(STranslateContext* pCxt, void* pTable) {
SArray* pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel);
taosArrayPush(pTables, &pTable);
if (hasSameTableAlias(pTables)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf,
TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS,
"Not unique table/alias: '%s'",
((STableNode*)pTable)->tableAlias);
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS,
"Not unique table/alias: '%s'", ((STableNode*)pTable)->tableAlias);
}
} else {
do {
@ -92,10 +90,8 @@ static int32_t addNamespace(STranslateContext* pCxt, void* pTable) {
if (pCxt->currLevel == currTotalLevel) {
taosArrayPush(pTables, &pTable);
if (hasSameTableAlias(pTables)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf,
TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS,
"Not unique table/alias: '%s'",
((STableNode*)pTable)->tableAlias);
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS,
"Not unique table/alias: '%s'", ((STableNode*)pTable)->tableAlias);
}
}
taosArrayPush(pCxt->pNsLevel, &pTables);
@ -1587,6 +1583,7 @@ static EDealRes rewriteExprToGroupKeyFunc(STranslateContext* pCxt, SNode** pNode
strcpy(pFunc->functionName, "_group_key");
strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName);
strcpy(pFunc->node.userAlias, ((SExprNode*)*pNode)->userAlias);
pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode);
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
*pNode = (SNode*)pFunc;
@ -2644,27 +2641,6 @@ static int32_t appendTsForImplicitTsFunc(STranslateContext* pCxt, SSelectStmt* p
return pCxt->errCode;
}
typedef struct SRwriteUniqueCxt {
STranslateContext* pTranslateCxt;
SNode* pExpr;
} SRwriteUniqueCxt;
static EDealRes rewriteSeletcValueFunc(STranslateContext* pCxt, SNode** pNode) {
SFunctionNode* pFirst = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pFirst) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
strcpy(pFirst->functionName, "first");
TSWAP(pFirst->pParameterList, ((SFunctionNode*)*pNode)->pParameterList);
strcpy(pFirst->node.aliasName, ((SExprNode*)*pNode)->aliasName);
nodesDestroyNode(*pNode);
*pNode = (SNode*)pFirst;
pCxt->errCode = fmGetFuncInfo(pFirst, pCxt->msgBuf.buf, pCxt->msgBuf.len);
((SSelectStmt*)pCxt->pCurrStmt)->hasAggFuncs = true;
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR;
}
typedef struct SReplaceOrderByAliasCxt {
STranslateContext* pTranslateCxt;
SNodeList* pProjectionList;

View File

@ -1727,16 +1727,11 @@ int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
metaGetTableNameByUid(pInput->param, uid, str);
for(int32_t i = 0; i < pInput->numOfRows; ++i) {
colDataAppend(pOutput->columnData, pOutput->numOfRows + i, str, false);
}
colDataAppendNItems(pOutput->columnData, pOutput->numOfRows, str, pInput->numOfRows);
pOutput->numOfRows += pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
/** Aggregation functions **/
int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
SColumnInfoData *pInputData = pInput->columnData;

View File

@ -31,6 +31,7 @@ void swapStr(char* j, char* J, int width) {
}
#endif
// todo refactor: 1) move away; 2) use merge sort instead; 3) qsort is not a stable sort actually.
void taosSort(void* arr, int64_t sz, int64_t width, __compar_fn_t compar) {
#ifdef WINDOWS
int64_t i, j;

View File

@ -0,0 +1,290 @@
# from asyncio.windows_events import NULL
import taos
import sys
import datetime
import inspect
import random
from util.log import *
from util.sql import *
from util.cases import *
from util.cluster import *
from util.common import *
sys.path.append("./6-cluster/")
from clusterCommonCreate import *
from clusterCommonCheck import clusterComCheck
import threading
class TDTestCase:
clientCfgDict = {'queryproxy': '1','debugFlag': 135}
clientCfgDict["debugFlag"] = 131
updatecfgDict = {'clientCfg': {}}
updatecfgDict = {'debugFlag': 131}
updatecfgDict = {'keepColumnName': 1}
updatecfgDict["clientCfg"] = clientCfgDict
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1):
tsql.execute("use %s" %dbName)
pre_create = "create table"
sql = pre_create
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
for i in range(ctbNum):
tagValue = 'beijing'
if (i % 10 == 0):
sql += " %s%d using %s (name,fleet,driver,device_version) tags('truck_%d', 'South%d','Trish%d','v2.%d')"%(ctbPrefix,i,stbName,i,i,i,i)
else:
model = 'H-%d'%i
sql += " %s%d using %s tags('truck_%d', 'South%d','Trish%d','%s','v2.%d')"%(ctbPrefix,i,stbName,i,i,i,model,i)
if (i > 0) and (i%1000 == 0):
tsql.execute(sql)
sql = pre_create
if sql != pre_create:
tsql.execute(sql)
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
return
def prepareData(self):
dbname="db_tsbs"
stabname1="readings"
stabname2="diagnostics"
ctbnamePre1="rct"
ctbnamePre2="dct"
ctbNums=40
self.ctbNums=ctbNums
rowNUms=100
ts=1451606400000
tdSql.execute(f"create database {dbname};")
tdSql.execute(f"use {dbname} ")
tdSql.execute(f'''
create table {stabname1} (ts timestamp,latitude double,longitude double,elevation double,velocity double,heading double,grade double,fuel_consumption double,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30));
''')
tdSql.execute(f'''
create table {stabname2} (ts timestamp,fuel_state double,current_load double,status bigint,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30)) ;
''')
self.create_ctable(tsql=tdSql,dbName=dbname,stbName=stabname1,ctbPrefix=ctbnamePre1,ctbNum=ctbNums)
self.create_ctable(tsql=tdSql,dbName=dbname,stbName=stabname2,ctbPrefix=ctbnamePre2,ctbNum=ctbNums)
for j in range(ctbNums):
for i in range(rowNUms):
tdSql.execute(
f"insert into rct{j} values ( {ts+i*60000}, {80+i}, {90+i}, {85+i}, {30+i*10}, {1.2*i}, {221+i*2}, {20+i*0.2}, {1500+i*20}, {150+i*2},{5+i} )"
)
status= random.randint(0,1)
tdSql.execute(
f"insert into dct{j} values ( {ts+i*60000}, {1+i*0.1},{1400+i*15}, {status},{1500+i*20}, {150+i*2},{5+i} )"
)
tdSql.execute("insert into dct9 (ts,fuel_state) values('2021-07-13 14:06:33.123Z',1.2) ;")
# def check_avg(self ,origin_query , check_query):
# avg_result = tdSql.getResult(origin_query)
# origin_result = tdSql.getResult(check_query)
# check_status = True
# for row_index , row in enumerate(avg_result):
# for col_index , elem in enumerate(row):
# if avg_result[row_index][col_index] != origin_result[row_index][col_index]:
# check_status = False
# if not check_status:
# tdLog.notice("avg function value has not as expected , sql is \"%s\" "%origin_query )
# sys.exit(1)
# else:
# tdLog.info("avg value check pass , it work as expected ,sql is \"%s\" "%check_query )
def createCluster(self):
tdSql.execute("create mnode on dnode 2")
tdSql.execute("create mnode on dnode 3")
tdSql.execute("create qnode on dnode 1")
tdSql.execute("create qnode on dnode 2")
tdSql.execute("create qnode on dnode 3")
time.sleep(10)
def tsbsIotQuery(self,tdSql):
tdSql.execute("use db_tsbs")
# test interval and partition
tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet; ")
# print(tdSql.queryResult)
parRows=tdSql.queryRows
tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet interval(10m); ")
tdSql.checkRows(parRows)
# # test insert into
# tdSql.execute("create table testsnode (ts timestamp, c1 float,c2 binary(30),c3 binary(30),c4 binary(30)) ;")
# tdSql.query("insert into testsnode SELECT ts,avg(velocity) as mean_velocity,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet,ts interval(10m);")
# tdSql.query("insert into testsnode(ts,c1,c2,c3,c4) SELECT ts,avg(velocity) as mean_velocity,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet,ts interval(10m);")
# test paitition interval fill
tdSql.query("SELECT name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0) ;")
# test partition interval limit (PRcore-TD-17410)
tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings partition BY name,driver,fleet interval (10m) limit 1);")
tdSql.checkRows(self.ctbNums)
# test partition interval Pseudo time-column
tdSql.query("SELECT count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;")
# 1 high-load:
tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name desc, ts DESC;")
tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name ;")
# 2 stationary-trucks
tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1)")
tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1) WHERE fleet = 'West' AND mean_velocity < 1000 partition BY name")
# 3 long-driving-sessions
tdSql.query("SELECT name,driver FROM(SELECT name,driver,count(*) AS ten_min FROM(SELECT _wstart as ts,name,driver,avg(velocity) as mean_velocity FROM readings where ts > '2016-01-01T00:00:34Z' AND ts <= '2016-01-01T04:00:34Z' partition BY name,driver interval(10m)) WHERE mean_velocity > 1 GROUP BY name,driver) WHERE ten_min > 22 ;")
#4 long-daily-sessions
tdSql.query("SELECT name,driver FROM(SELECT name,driver,count(*) AS ten_min FROM(SELECT name,driver,avg(velocity) as mean_velocity FROM readings WHERE fleet ='West' AND ts > '2016-01-01T12:31:37Z' AND ts <= '2016-01-05T12:31:37Z' partition BY name,driver interval(10m) ) WHERE mean_velocity > 1 GROUP BY name,driver) WHERE ten_min > 60")
# 5. avg-daily-driving-duration
tdSql.query("select _wstart as ts,fleet,name,driver,count(mv)/6 as hours_driven from ( select _wstart as ts,fleet,name,driver,avg(velocity) as mv from readings where ts > '2016-01-01T00:00:00Z' and ts < '2016-01-05T00:00:01Z' partition by fleet,name,driver interval(10m)) where ts > '2016-01-01T00:00:00Z' and ts < '2016-01-05T00:00:01Z' partition by fleet,name,driver interval(1d) ;")
# # 6. avg-daily-driving-session
# #taosc core dumped
# tdSql.execute("create table random_measure2_1 (ts timestamp,ela float, name binary(40))")
# tdSql.query("SELECT ts,diff(mv) AS difka FROM (SELECT ts,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name,ts interval(10m) fill(value,0)) GROUP BY name,ts;")
# tdSql.query("select name,diff(mv) AS difka FROM (SELECT ts,name,mv FROM (SELECT _wstart as ts,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0))) group BY name ;")
# tdSql.query("SELECT _wstart,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0)")
# 7. avg-load
tdSql.query("SELECT fleet, model,avg(ml) AS mean_load_percentage FROM (SELECT fleet, model,current_load/load_capacity AS ml FROM diagnostics partition BY name, fleet, model) partition BY fleet, model order by fleet ;")
# 8. daily-activity
tdSql.query(" SELECT model,ms1 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;")
tdSql.query(" SELECT model,ms1 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) ) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;")
tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;")
tdSql.query("SELECT _wstart as ts,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) ) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;")
# 9. breakdown-frequency
# NULL ---count(NULL)=0 expect count(NULL)= 100
tdSql.query("SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where model is null partition BY model,state_changed ")
parRows=tdSql.queryRows
assert parRows != 0 , "query result is wrong"
tdSql.query(" SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where state_changed =1 partition BY model,state_changed ;")
#it's already supported:
# last-loc
tdSql.query("SELECT last_row(ts),latitude,longitude,name,driver FROM readings WHERE fleet='South' and name IS NOT NULL partition BY name,driver order by name ;")
#2. low-fuel
tdSql.query("SELECT last_row(ts),name,driver,fuel_state,driver FROM diagnostics WHERE fuel_state <= 0.1 AND fleet = 'South' and name IS NOT NULL GROUP BY name,driver order by name;")
# 3. avg-vs-projected-fuel-consumption
tdSql.query("select avg(fuel_consumption) as avg_fuel_consumption,avg(nominal_fuel_consumption) as nominal_fuel_consumption from readings where velocity > 1 group by fleet")
def restartFunc(self,func_name,threadNumbers,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db',
'dbNumbers': 8,
'dropFlag': 1,
'event': '',
'vgroups': 2,
'replica': 1,
'stbName': 'stb',
'stbNumbers': 100,
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
}
dnodeNumbers=int(dnodeNumbers)
mnodeNums=int(mnodeNums)
vnodeNumbers = int(dnodeNumbers-mnodeNums)
tdSql.query("show dnodes;")
tdLog.debug(tdSql.queryResult)
clusterComCheck.checkDnodes(dnodeNumbers)
tdLog.info("create database and stable")
tdDnodes=cluster.dnodes
stopcount =0
threads=[]
for i in range(threadNumbers):
newTdSql=tdCom.newTdSql()
print("123")
threads.append(threading.Thread(target=func_name,args=(newTdSql,)))
print("456")
for tr in threads:
tr.start()
tdLog.info("Take turns stopping %s "%stopRole)
while stopcount < restartNumbers:
tdLog.info(" restart loop: %d"%stopcount )
if stopRole == "mnode":
for i in range(mnodeNums):
tdDnodes[i].stoptaosd()
# sleep(10)
tdDnodes[i].starttaosd()
# sleep(10)
elif stopRole == "vnode":
for i in range(vnodeNumbers):
tdDnodes[i+mnodeNums].stoptaosd()
# sleep(10)
tdDnodes[i+mnodeNums].starttaosd()
# sleep(10)
elif stopRole == "dnode":
for i in range(dnodeNumbers):
tdDnodes[i].stoptaosd()
# sleep(10)
tdDnodes[i].starttaosd()
# sleep(10)
# dnodeNumbers don't include database of schema
if clusterComCheck.checkDnodes(dnodeNumbers):
tdLog.info("check dnodes status is ready")
else:
tdLog.info("check dnodes status is not ready")
self.stopThread(threads)
tdLog.exit("one or more of dnodes failed to start ")
# self.check3mnode()
stopcount+=1
for tr in threads:
tr.join()
def run(self):
tdLog.printNoPrefix("==========step1:create database and table,insert data ==============")
self.createCluster()
self.prepareData()
queryPolicy=2
simClientCfg="%s/taos.cfg"%tdDnodes.getSimCfgPath()
cmd='sed -i "s/^queryPolicy.*/queryPolicy 2/g" %s'%simClientCfg
os.system(cmd)
# self.tsbsIotQuery()
self.restartFunc(func_name=self.tsbsIotQuery,threadNumbers=3,dnodeNumbers=5,mnodeNums=3,restartNumbers=3,stopRole='mnode')
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -318,7 +318,6 @@ class TDTestCase:
os.system(cmd)
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql.execute("reset query cache")
tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy)
tdSql.query("show local variables;")
for i in range(tdSql.queryRows):

View File

@ -1,3 +1,4 @@
# from asyncio.windows_events import NULL
import taos
import sys
import datetime
@ -21,38 +22,99 @@ class TDTestCase:
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
def prepareData(self):
database="db_tsbs"
ts=1451606400000
tdSql.execute(f"create database {database};")
tdSql.execute(f"use {database} ")
tdSql.execute('''
create table readings (ts timestamp,latitude double,longitude double,elevation double,velocity double,heading double,grade double,fuel_consumption double,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30));
''')
tdSql.execute('''
create table diagnostics (ts timestamp,fuel_state double,current_load double,status bigint,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30)) ;
''')
def create_ctable(self,tsql=None, dbName='db',stbName='stb',ctbPrefix='ctb',ctbNum=1):
tsql.execute("use %s" %dbName)
pre_create = "create table"
sql = pre_create
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
for i in range(ctbNum):
tagValue = 'beijing'
if (i % 10 == 0):
sql += " %s%d using %s (name,fleet,driver,device_version) tags('truck_%d', 'South%d','Trish%d','v2.%d')"%(ctbPrefix,i,stbName,i,i,i,i)
else:
model = 'H-%d'%i
sql += " %s%d using %s tags('truck_%d', 'South%d','Trish%d','%s','v2.%d')"%(ctbPrefix,i,stbName,i,i,i,model,i)
if (i > 0) and (i%1000 == 0):
tsql.execute(sql)
sql = pre_create
if sql != pre_create:
tsql.execute(sql)
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
return
for i in range(10):
if i == 1 or i == 2 :
tdLog.debug(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}', NULL,'v2.3')")
tdSql.execute(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}', NULL,'v2.3')")
else :
tdSql.execute(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}','H-{i}','v2.3')")
if i == 1 or i == 2 :
tdSql.execute(f"create table dct{i} using diagnostics (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}',NULL ,'v2.3')")
else:
tdSql.execute(f"create table dct{i} using diagnostics (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}','H-{i}','v2.3')")
for j in range(10):
for i in range(100):
tdSql.execute(
f"insert into rct{j} values ( {ts+i*60000}, {80+i}, {90+i}, {85+i}, {30+i*10}, {1.2*i}, {221+i*2}, {20+i*0.2}, {1500+i*20}, {150+i*2},{5+i} )"
)
status= random.randint(0,1)
tdSql.execute(
f"insert into dct{j} values ( {ts+i*60000}, {1+i*0.1},{1400+i*15}, {status},{1500+i*20}, {150+i*2},{5+i} )"
)
tdSql.execute("insert into dct9 (ts,fuel_state) values('2021-07-13 14:06:33.123Z',1.2) ;")
def insertData(self,startTs,tsql=None, dbName='db',stbName='stb',ctbPrefix='ctb',ctbNum=1,rowsPerTbl=100,batchNum=1000):
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
if startTs is None:
t = time.time()
startTs = int(round(t * 1000))
for i in range(ctbNum):
sql += " %s%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl):
if(ctbPrefix=="rct"):
sql += f"({startTs+j*60000}, {80+j}, {90+j}, {85+j}, {30+j*10}, {1.2*j}, {221+j*2}, {20+j*0.2}, {1500+j*20}, {150+j*2},{5+j}) "
elif ( ctbPrefix=="dct"):
status= random.randint(0,1)
sql += f"( {startTs+j*60000}, {1+j*0.1},{1400+j*15}, {status},{1500+j*20}, {150+j*2},{5+j} ) "
# tdLog.debug("1insert sql:%s"%sql)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
# tdLog.debug("2insert sql:%s"%sql)
tsql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i)
else:
sql = "insert into "
if sql != pre_insert:
# tdLog.debug("3insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def prepareData(self):
dbname="db_tsbs"
stabname1="readings"
stabname2="diagnostics"
ctbnamePre1="rct"
ctbnamePre2="dct"
ctbNums=40
self.ctbNums=ctbNums
rowNUms=200
ts=1451606400000
tdSql.execute(f"create database {dbname};")
tdSql.execute(f"use {dbname} ")
tdSql.execute(f'''
create table {stabname1} (ts timestamp,latitude double,longitude double,elevation double,velocity double,heading double,grade double,fuel_consumption double,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30));
''')
tdSql.execute(f'''
create table {stabname2} (ts timestamp,fuel_state double,current_load double,status bigint,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30)) ;
''')
self.create_ctable(tsql=tdSql,dbName=dbname,stbName=stabname1,ctbPrefix=ctbnamePre1,ctbNum=ctbNums)
self.create_ctable(tsql=tdSql,dbName=dbname,stbName=stabname2,ctbPrefix=ctbnamePre2,ctbNum=ctbNums)
self.insertData(tsql=tdSql,dbName=dbname,stbName=stabname1,ctbPrefix=ctbnamePre1,ctbNum=ctbNums,rowsPerTbl=rowNUms,startTs=ts,batchNum=1000)
self.insertData(tsql=tdSql,dbName=dbname,stbName=stabname2,ctbPrefix=ctbnamePre2,ctbNum=ctbNums,rowsPerTbl=rowNUms,startTs=ts,batchNum=1000)
# for i in range(ctbNum):
# if i %10 == 0 :
# # tdLog.debug(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}', NULL,'v2.3')")
# tdSql.execute(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}', NULL,'v2.3')")
# else :
# tdSql.execute(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}','H-{i}','v2.3')")
# if i %10 == 0 :
# tdSql.execute(f"create table dct{i} using diagnostics (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}',NULL ,'v2.3')")
# else:
# tdSql.execute(f"create table dct{i} using diagnostics (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}','H-{i}','v2.3')")
# for j in range(ctbNums):
# for i in range(rowNUms):
# tdSql.execute(
# f"insert into rct{j} values ( {ts+i*60000}, {80+i}, {90+i}, {85+i}, {30+i*10}, {1.2*i}, {221+i*2}, {20+i*0.2}, {1500+i*20}, {150+i*2},{5+i} )"
# )
# status= random.randint(0,1)
# tdSql.execute(
# f"insert into dct{j} values ( {ts+i*60000}, {1+i*0.1},{1400+i*15}, {status},{1500+i*20}, {150+i*2},{5+i} )"
# )
# tdSql.execute("insert into dct9 (ts,fuel_state) values('2021-07-13 14:06:33.123Z',1.2) ;")
# def check_avg(self ,origin_query , check_query):
# avg_result = tdSql.getResult(origin_query)
# origin_result = tdSql.getResult(check_query)
@ -75,7 +137,6 @@ class TDTestCase:
# test interval and partition
tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet; ")
print(tdSql.queryResult)
parRows=tdSql.queryRows
tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet interval(10m); ")
tdSql.checkRows(parRows)
@ -94,7 +155,7 @@ class TDTestCase:
# test partition interval limit (PRcore-TD-17410)
tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings partition BY name,driver,fleet interval (10m) limit 1);")
tdSql.checkRows(10)
tdSql.checkRows(self.ctbNums)
# test partition interval Pseudo time-column
tdSql.query("SELECT count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;")
@ -136,17 +197,27 @@ class TDTestCase:
tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;")
tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) ) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;")
tdSql.query("SELECT _wstart as ts,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) ) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;")
# 9. breakdown-frequency
# NULL ---count(NULL)=0 expect count(NULL)= 100
tdSql.query("SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where model is null partition BY model,state_changed ")
parRows=tdSql.queryRows
assert parRows != 0 , "query result is wrong"
assert parRows != 0 , "query result is wrong, query rows %d but expect > 0 " %parRows
tdSql.query(" SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where state_changed =1 partition BY model,state_changed ;")
sql="select model,ctc from (SELECT model,count(state_changed) as ctc FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= 1451606400000 AND ts < 1451952001000 ) WHERE ts >= 1451606400000 AND ts < 1451952001000 partition BY model interval(10m)) partition BY model) WHERE state_changed = 1 partition BY model )where model is null;"
# for i in range(2):
# tdSql.query("%s"%sql)
# quertR1=tdSql.queryResult
# for j in range(50):
# tdSql.query("%s"%sql)
# quertR2=tdSql.queryResult
# assert quertR1 == quertR2 , "%s != %s ,The results of multiple queries are different" %(quertR1,quertR2)
#it's already supported:
# last-loc

View File

@ -33,10 +33,11 @@ int shell_conn_ws_server(bool first) {
return 0;
}
static int horizontalPrintWebsocket(WS_RES* wres) {
static int horizontalPrintWebsocket(WS_RES* wres, double* execute_time) {
const void* data = NULL;
int rows;
ws_fetch_block(wres, &data, &rows);
*execute_time += (double)(ws_take_timing(wres)/1E6);
if (!rows) {
return 0;
}
@ -72,10 +73,11 @@ static int horizontalPrintWebsocket(WS_RES* wres) {
return numOfRows;
}
static int verticalPrintWebsocket(WS_RES* wres) {
static int verticalPrintWebsocket(WS_RES* wres, double* pexecute_time) {
int rows = 0;
const void* data = NULL;
ws_fetch_block(wres, &data, &rows);
*pexecute_time += (double)(ws_take_timing(wres)/1E6);
if (!rows) {
return 0;
}
@ -112,7 +114,7 @@ static int verticalPrintWebsocket(WS_RES* wres) {
return numOfRows;
}
static int dumpWebsocketToFile(const char* fname, WS_RES* wres) {
static int dumpWebsocketToFile(const char* fname, WS_RES* wres, double* pexecute_time) {
char fullname[PATH_MAX] = {0};
if (taosExpandDir(fname, fullname, PATH_MAX) != 0) {
tstrncpy(fullname, fname, PATH_MAX);
@ -127,6 +129,7 @@ static int dumpWebsocketToFile(const char* fname, WS_RES* wres) {
int rows = 0;
const void* data = NULL;
ws_fetch_block(wres, &data, &rows);
*pexecute_time += (double)(ws_take_timing(wres)/1E6);
if (!rows) {
taosCloseFile(&pFile);
return 0;
@ -162,14 +165,14 @@ static int dumpWebsocketToFile(const char* fname, WS_RES* wres) {
return numOfRows;
}
static int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical) {
static int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical, double* pexecute_time) {
int numOfRows = 0;
if (fname != NULL) {
numOfRows = dumpWebsocketToFile(fname, wres);
numOfRows = dumpWebsocketToFile(fname, wres, pexecute_time);
} else if (vertical) {
numOfRows = verticalPrintWebsocket(wres);
numOfRows = verticalPrintWebsocket(wres, pexecute_time);
} else {
numOfRows = horizontalPrintWebsocket(wres);
numOfRows = horizontalPrintWebsocket(wres, pexecute_time);
}
*error_no = ws_errno(wres);
return numOfRows;
@ -225,6 +228,8 @@ void shellRunSingleCommandWebsocketImp(char *command) {
return;
}
double execute_time = ws_take_timing(res)/1E6;
if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) {
fprintf(stdout, "Database changed.\r\n\r\n");
fflush(stdout);
@ -236,22 +241,27 @@ void shellRunSingleCommandWebsocketImp(char *command) {
if (ws_is_update_query(res)) {
numOfRows = ws_affected_rows(res);
et = taosGetTimestampUs();
printf("Query Ok, %d of %d row(s) in database (%.6fs)\n", numOfRows, numOfRows,
(et - st)/1E6);
double total_time = (et - st)/1E3;
double net_time = total_time - (double)execute_time;
printf("Query Ok, %d of %d row(s) in database\n", numOfRows, numOfRows);
printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n", execute_time, net_time, total_time);
} else {
int error_no = 0;
numOfRows = shellDumpWebsocket(res, fname, &error_no, printMode);
numOfRows = shellDumpWebsocket(res, fname, &error_no, printMode, &execute_time);
if (numOfRows < 0) {
ws_free_result(res);
return;
}
et = taosGetTimestampUs();
double total_time = (et - st) / 1E3;
double net_time = total_time - execute_time;
if (error_no == 0 && !shell.stop_query) {
printf("Query OK, %d row(s) in set (%.6fs)\n", numOfRows,
(et - st)/1E6);
printf("Query OK, %d row(s) in set\n", numOfRows);
printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n", execute_time, net_time, total_time);
} else {
printf("Query interrupted, %d row(s) in set (%.6fs)\n", numOfRows,
(et - st)/1E6);
printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n", execute_time, net_time, total_time);
}
}
printf("\n");