feature: optimize interval with limit

This commit is contained in:
wangjiaming0909 2023-08-03 18:05:52 +08:00
parent c57defa1c9
commit fba43e1748
11 changed files with 607 additions and 163 deletions

View File

@ -25,6 +25,7 @@ extern "C" {
#include "tsort.h"
#include "ttszip.h"
#include "tvariant.h"
#include "theap.h"
#include "dataSinkMgt.h"
#include "executil.h"
@ -418,6 +419,14 @@ typedef struct SIntervalAggOperatorInfo {
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
STimeWindowAggSupp twAggSup;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
// for limit optimization
bool limited;
int64_t limit;
bool slimited;
int64_t slimit;
uint64_t curGroupId; // initialize to UINT64_MAX
uint64_t handledGroupNum;
BoundedQueue* pBQ;
} SIntervalAggOperatorInfo;
typedef struct SMergeAlignedIntervalAggOperatorInfo {

View File

@ -2118,8 +2118,9 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList);
if (groupSort) {
if (groupSort || pScanNode->groupOrderScan) {
code = sortTableGroup(pTableListInfo);
}
}

View File

@ -275,7 +275,6 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
SNode* pTagIndexCond, const char* pUser, const char* dbname) {
int32_t type = nodeType(pPhyNode);
const char* idstr = GET_TASKID(pTaskInfo);
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
SOperatorInfo* pOperator = NULL;
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {

View File

@ -848,6 +848,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return result;
}
while (1) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
setOperatorCompleted(pOperator);
return NULL;
@ -869,9 +870,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
if (result != NULL) {
return result;
}
setOperatorCompleted(pOperator);
return NULL;
}
}
}

View File

@ -876,7 +876,67 @@ bool needDeleteWindowBuf(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) {
return pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark;
}
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
static bool tsKeyCompFn(void* l, void* r, void* param) {
TSKEY* lTS = (TSKEY*)l;
TSKEY* rTS = (TSKEY*)r;
SIntervalAggOperatorInfo* pInfo = param;
return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
}
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
}
/**
* @brief check if cur window should be filtered out by limit info
* @retval true if should be filtered out
* @retval false if not filtering out
* @note If no limit info, we skip filtering.
* If input/output ts order mismatch, we skip filtering too.
* eg. input ts order: desc, and output ts order: asc, limit: 10
* IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
* every tuple in every block.
* And the boundedQueue keeps refreshing all records with smaller ts key.
*/
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId) {
if (!pOperatorInfo->limited // if no limit info, no filter will be applied
|| pOperatorInfo->binfo.inputTsOrder !=
pOperatorInfo->binfo.outputTsOrder // if input/output ts order mismatch, no filter
) {
return false;
}
if (pOperatorInfo->limit == 0) return true;
if (pOperatorInfo->pBQ == NULL) {
pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosMemoryFree, pOperatorInfo);
}
bool shouldFilter = false;
// if BQ has been full, compare it with top of BQ
if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
}
if (shouldFilter) {
return true;
} else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
return false;
}
// cur win not been filtered out and not been pushed into BQ yet, push it into BQ
PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
*((TSKEY*)node.data) = win->skey;
if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
taosMemoryFree(node.data);
return true;
}
return false;
}
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
int32_t scanFlag) {
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
@ -891,8 +951,21 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL;
if (tableGroupId != pInfo->curGroupId) {
pInfo->handledGroupNum += 1;
if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
return true;
} else {
pInfo->curGroupId = tableGroupId;
destroyBoundedQueue(pInfo->pBQ);
pInfo->pBQ = NULL;
}
}
STimeWindow win =
getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
if (filterWindowWithLimit(pInfo, &win, tableGroupId)) return false;
int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
@ -929,7 +1002,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
while (1) {
int32_t prevEndPos = forwardRows - 1 + startPos;
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->binfo.inputTsOrder);
if (startPos < 0) {
if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId)) {
break;
}
// null data, failed to allocate more memory buffer
@ -963,6 +1036,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if (pInfo->timeWindowInterpo) {
saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
}
return false;
}
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
@ -1043,7 +1117,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag);
if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
}
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
@ -1495,6 +1569,7 @@ void destroyIntervalOperatorInfo(void* param) {
cleanupGroupResInfo(&pInfo->groupResInfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
destroyBoundedQueue(pInfo->pBQ);
taosMemoryFreeClear(param);
}
@ -1658,6 +1733,17 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
pInfo->interval = interval;
pInfo->twAggSup = as;
pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
if (pPhyNode->window.node.pLimit) {
SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
pInfo->limited = true;
pInfo->limit = pLimit->limit + pLimit->offset;
}
if (pPhyNode->window.node.pSlimit) {
SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
pInfo->slimited = true;
pInfo->slimit = pLimit->limit + pLimit->offset;
pInfo->curGroupId = UINT64_MAX;
}
if (pPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;

View File

@ -847,7 +847,6 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
: (pSelect->hasTimeLineFunc ? getRequireDataOrder(true, pSelect) : DATA_ORDER_LEVEL_IN_BLOCK);
pWindow->node.resultDataOrder =
pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_GLOBAL : getRequireDataOrder(true, pSelect);
pWindow->pTspk = nodesCloneNode(pInterval->pCol);
if (NULL == pWindow->pTspk) {
nodesDestroyNode((SNode*)pWindow);

View File

@ -1546,11 +1546,33 @@ static bool planOptNodeListHasTbname(SNodeList* pKeys) {
}
static bool partTagsIsOptimizableNode(SLogicNode* pNode) {
return ((QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode) ||
(QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && NULL != ((SAggLogicNode*)pNode)->pGroupKeys &&
NULL != ((SAggLogicNode*)pNode)->pAggFuncs)) &&
1 == LIST_LENGTH(pNode->pChildren) &&
QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0)));
bool ret = 1 == LIST_LENGTH(pNode->pChildren) &&
QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0));
if (!ret) return ret;
switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_PARTITION: {
if (pNode->pParent && nodeType(pNode->pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) {
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode->pParent;
if (pWindow->winType == WINDOW_TYPE_INTERVAL) {
// if interval has slimit, we push down partition node to scan, and scan will set groupOrderScan to true
// we want to skip groups of blocks after slimit satisfied
// if interval only has limit, we do not push down partition node to scan
// we want to get grouped output from partition node and make use of limit
// if no slimit and no limit, we push down partition node and groupOrderScan is false, cause we do not need
// group ordered output
if (!pWindow->node.pSlimit && pWindow->node.pLimit) ret = false;
}
}
} break;
case QUERY_NODE_LOGIC_PLAN_AGG: {
SAggLogicNode* pAgg = (SAggLogicNode*)pNode;
ret = pAgg->pGroupKeys && pAgg->pAggFuncs;
} break;
default:
ret = false;
break;
}
return ret;
}
static SNodeList* partTagsGetPartKeys(SLogicNode* pNode) {
@ -1691,6 +1713,8 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
scanPathOptSetGroupOrderScan(pScan);
pParent->hasGroupKeyOptimized = true;
}
if (pNode->pParent->pSlimit)
pScan->groupOrderScan = true;
NODES_CLEAR_LIST(pNode->pChildren);
nodesDestroyNode((SNode*)pNode);
@ -2644,21 +2668,77 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
}
static bool pushDownLimitOptShouldBeOptimized(SLogicNode* pNode) {
if (NULL == pNode->pLimit || 1 != LIST_LENGTH(pNode->pChildren)) {
if ((NULL == pNode->pLimit && pNode->pSlimit == NULL) || 1 != LIST_LENGTH(pNode->pChildren)) {
return false;
}
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
// push down to sort node
if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
// if we have pushed down, we skip it
if (pChild->pLimit) return false;
} else if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild) || QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pNode)) {
// push down to table scan node
// if pNode is sortNode, we skip push down limit info to table scan node
if (pChild->pLimit || pChild->pSlimit) return false;
return true;
}
static void swapLimit(SLogicNode* pParent, SLogicNode* pChild) {
pChild->pLimit = pParent->pLimit;
pParent->pLimit = NULL;
}
static void cloneLimit(SLogicNode* pParent, SLogicNode* pChild) {
SLimitNode* pLimit = NULL;
if (pParent->pLimit) {
pChild->pLimit = nodesCloneNode(pParent->pLimit);
pLimit = (SLimitNode*)pChild->pLimit;
pLimit->limit += pLimit->offset;
pLimit->offset = 0;
}
if (pParent->pSlimit) {
pChild->pSlimit = nodesCloneNode(pParent->pSlimit);
pLimit = (SLimitNode*)pChild->pSlimit;
pLimit->limit += pLimit->offset;
pLimit->offset = 0;
}
}
static bool pushDownLimitHow(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPushTo);
static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPushTo) {
switch (nodeType(pNodeLimitPushTo)) {
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNodeLimitPushTo;
if (pWindow->winType != WINDOW_TYPE_INTERVAL) break;
cloneLimit(pNodeWithLimit, pNodeLimitPushTo);
return true;
}
case QUERY_NODE_LOGIC_PLAN_FILL:
case QUERY_NODE_LOGIC_PLAN_SORT: {
cloneLimit(pNodeWithLimit, pNodeLimitPushTo);
SNode* pChild = NULL;
FOREACH(pChild, pNodeLimitPushTo->pChildren) { pushDownLimitHow(pNodeLimitPushTo, (SLogicNode*)pChild); }
return true;
}
case QUERY_NODE_LOGIC_PLAN_SCAN:
if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT && pNodeWithLimit->pLimit) {
swapLimit(pNodeWithLimit, pNodeLimitPushTo);
return true;
}
default:
break;
}
return false;
}
return true;
static bool pushDownLimitHow(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPushTo) {
switch (nodeType(pNodeWithLimit)) {
case QUERY_NODE_LOGIC_PLAN_PROJECT:
case QUERY_NODE_LOGIC_PLAN_FILL:
return pushDownLimitTo(pNodeWithLimit, pNodeLimitPushTo);
case QUERY_NODE_LOGIC_PLAN_SORT: {
SSortLogicNode* pSort = (SSortLogicNode*)pNodeWithLimit;
if (sortPriKeyOptIsPriKeyOrderBy(pSort->pSortKeys)) return pushDownLimitTo(pNodeWithLimit, pNodeLimitPushTo);
}
default:
break;
}
return false;
}
static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
@ -2669,17 +2749,9 @@ static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
nodesDestroyNode(pChild->pLimit);
if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
pChild->pLimit = nodesCloneNode(pNode->pLimit);
SLimitNode* pLimit = (SLimitNode*)pChild->pLimit;
pLimit->limit += pLimit->offset;
pLimit->offset = 0;
} else {
pChild->pLimit = pNode->pLimit;
pNode->pLimit = NULL;
}
if (pushDownLimitHow(pNode, pChild)) {
pCxt->optimized = true;
}
return TSDB_CODE_SUCCESS;
}
@ -2980,6 +3052,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "sortNonPriKeyOptimize", .optimizeFunc = sortNonPriKeyOptimize},
{.pName = "SortPrimaryKey", .optimizeFunc = sortPrimaryKeyOptimize},
{.pName = "SmaIndex", .optimizeFunc = smaIndexOptimize},
{.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize},
{.pName = "PartitionTags", .optimizeFunc = partTagsOptimize},
{.pName = "MergeProjects", .optimizeFunc = mergeProjectsOptimize},
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
@ -2988,7 +3061,6 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize},
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize},
{.pName = "TagScan", .optimizeFunc = tagScanOptimize},
{.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize},
{.pName = "TableCountScan", .optimizeFunc = tableCountScanOptimize},
};
// clang-format on

View File

@ -498,6 +498,18 @@ static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* p
}
break;
}
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
if (pMerge->node.pLimit) {
nodesDestroyNode(pMerge->node.pLimit);
pMerge->node.pLimit = NULL;
}
if (pMerge->node.pSlimit) {
nodesDestroyNode(pMerge->node.pSlimit);
pMerge->node.pSlimit = NULL;
}
break;
}
default:
break;
}

View File

@ -25,6 +25,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py

View File

@ -1603,58 +1603,58 @@ QUERY_PLAN: Time Range: [-9223372036854775808,
taos> select _wstart, last(ts), avg(c2) from meters interval(10s) order by _wstart desc;
_wstart | last(ts) | avg(c2) |
================================================================================
2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000 |
2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000 |
2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000 |
2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000 |
2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000 |
2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000 |
2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000 |
2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000 |
2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000 |
2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000 |
2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000000000 |
2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000000000 |
2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000000000 |
2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000000000 |
2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000000000 |
2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000000000 |
2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000000000 |
2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000000000 |
2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000000000 |
2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000000000 |
taos> select _wstart, last(ts), avg(c2) from meters interval(10s) order by _wstart asc;
_wstart | last(ts) | avg(c2) |
================================================================================
2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000 |
2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000 |
2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000 |
2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000 |
2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000 |
2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000 |
2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000 |
2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000 |
2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000 |
2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000 |
2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000000000 |
2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000000000 |
2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000000000 |
2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000000000 |
2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000000000 |
2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000000000 |
2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000000000 |
2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000000000 |
2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000000000 |
2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000000000 |
taos> select _wstart, first(ts), avg(c2) from meters interval(10s) order by _wstart asc;
_wstart | first(ts) | avg(c2) |
================================================================================
2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000 |
2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000 |
2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000 |
2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000 |
2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000 |
2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000 |
2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000 |
2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000 |
2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000 |
2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000 |
2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000000000 |
2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000000000 |
2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000000000 |
2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000000000 |
2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000000000 |
2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000000000 |
2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000000000 |
2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000000000 |
2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000000000 |
2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000000000 |
taos> select _wstart, first(ts), avg(c2) from meters interval(10s) order by _wstart desc;
_wstart | first(ts) | avg(c2) |
================================================================================
2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000 |
2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000 |
2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000 |
2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000 |
2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000 |
2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000 |
2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000 |
2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000 |
2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000 |
2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000 |
2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000000000 |
2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000000000 |
2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000000000 |
2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000000000 |
2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000000000 |
2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000000000 |
2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000000000 |
2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000000000 |
2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000000000 |
2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000000000 |
taos> select last(a) as d from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s)) order by d;
d |
@ -1792,35 +1792,35 @@ taos> select last(b) as d from (select last(ts) as b, avg(c2) as c from meters i
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by d desc;
_wstart | d | avg(c) |
================================================================================
2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000 |
2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000 |
2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000 |
2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000 |
2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000 |
2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.600000000 |
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 |
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 |
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 |
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 |
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 |
2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000 |
2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000 |
2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000 |
2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000 |
2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000 |
2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.400000000 |
2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000 |
2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.800000000 |
2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000 |
2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000 |
2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.800000000 |
2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000 |
2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.600000000 |
2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000 |
2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000 |
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 |
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 |
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 |
2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000000000 |
2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000000000 |
2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000000000 |
2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000000000 |
2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000000000 |
2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.599999999999994 |
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.199999999999989 |
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000000011 |
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000000006 |
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000000000 |
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000000000 |
2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000000000 |
2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000000000 |
2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000000000 |
2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000000000 |
2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000000003 |
2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.399999999999999 |
2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000000001 |
2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.799999999999997 |
2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000000000 |
2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000000006 |
2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.799999999999997 |
2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000000003 |
2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.599999999999994 |
2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000000000 |
2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000000000 |
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000000000 |
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000000000 |
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000000000 |
taos> explain verbose true select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by d desc\G;
*************************** 1.row ***************************
@ -2673,51 +2673,51 @@ taos> select ts, c2 from d1 order by ts asc, c2 desc limit 5,5;
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc;
_wstart | d | avg(c) |
================================================================================
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 |
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 |
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 |
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 |
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 |
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 |
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 |
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 |
2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000 |
2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000 |
2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.600000000 |
2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000 |
2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000 |
2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.600000000 |
2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000 |
2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000 |
2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000 |
2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000 |
2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.800000000 |
2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000 |
2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000 |
2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000 |
2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.800000000 |
2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000 |
2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.400000000 |
2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000 |
2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000 |
2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000 |
2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000 |
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000000000 |
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000000000 |
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000000006 |
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000000000 |
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000000000 |
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000000011 |
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000000000 |
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.199999999999989 |
2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000000000 |
2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000000000 |
2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.599999999999994 |
2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000000000 |
2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000000000 |
2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.599999999999994 |
2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000000000 |
2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000000003 |
2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000000000 |
2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000000000 |
2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.799999999999997 |
2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000000006 |
2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000000000 |
2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000000000 |
2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.799999999999997 |
2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000000001 |
2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.399999999999999 |
2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000000003 |
2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000000000 |
2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000000000 |
2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000000000 |
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2;
_wstart | d | avg(c) |
================================================================================
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 |
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 |
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000000000 |
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000000000 |
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2,6;
_wstart | d | avg(c) |
================================================================================
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 |
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 |
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 |
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 |
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 |
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 |
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000000006 |
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000000000 |
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000000000 |
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000000011 |
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000000000 |
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.199999999999989 |
taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 10;
last(ts) | d |

View File

@ -0,0 +1,266 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
# from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 4
self.ctbNum = 10
self.rowsPerTbl = 10000
self.duraion = '1h'
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'):
if dropFlag == 1:
tsql.execute("drop database if exists %s"%(dbName))
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s"%(dbName, vgroups, replica, duration))
tdLog.debug("complete to create database %s"%(dbName))
return
def create_stable(self,tsql, paraDict):
colString = tdCom.gen_column_type_str(colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"])
tagString = tdCom.gen_tag_type_str(tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"])
sqlString = f"create table if not exists %s.%s (%s) tags (%s)"%(paraDict["dbName"], paraDict["stbName"], colString, tagString)
tdLog.debug("%s"%(sqlString))
tsql.execute(sqlString)
return
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
for i in range(ctbNum):
sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % \
(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,(i+ctbStartIdx) % 5,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx)
tsql.execute(sqlString)
tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
return
def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
for i in range(ctbNum):
rowsBatched = 0
sql += " %s%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl):
if (i < ctbNum/2):
sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10, j%10, j%10)
else:
sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10)
rowsBatched += 1
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i)
else:
sql = "insert into "
if sql != pre_insert:
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'test',
'dropFlag': 1,
'vgroups': 2,
'stbName': 'meters',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'FLOAT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'smallint', 'count':1},{'type': 'tinyint', 'count':1},{'type': 'bool', 'count':1},{'type': 'binary', 'len':10, 'count':1},{'type': 'nchar', 'len':10, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}],
'ctbPrefix': 't',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 10000,
'batchNum': 3000,
'startTs': 1537146000000,
'tsStep': 600000}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("create database")
self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion)
tdLog.info("create stb")
self.create_stable(tsql=tdSql, paraDict=paraDict)
tdLog.info("create child tables")
self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"], \
stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],\
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict["ctbStartIdx"])
self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],\
ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],\
rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\
startTs=paraDict["startTs"],tsStep=paraDict["tsStep"])
return
def check_first_rows(self, all_rows, limited_rows, offset: int = 0):
for i in range(0, len(limited_rows) - 1):
if limited_rows[i] != all_rows[i + offset]:
tdLog.info("row: %d, row in all: %s" % (i+offset+1, str(all_rows[i+offset])))
tdLog.info("row: %d, row in limted: %s" % (i+1, str(limited_rows[i])))
tdLog.exit("row data check failed")
tdLog.info("all rows are the same as query without limit..")
def query_and_check_with_slimit(self, sql: str, max_limit: int, step: int, offset: int = 0):
self.query_and_check_with_limit(sql, max_limit, step, offset, ' slimit ')
def query_and_check_with_limit(self, sql: str, max_limit: int, step: int, offset: int = 0, limit_str: str = ' limit '):
for limit in range(0, max_limit, step):
limited_sql = sql + limit_str + str(offset) + "," + str(limit)
tdLog.info("query with sql: %s " % (sql) + limit_str + " %d,%d" % (offset, limit))
all_rows = tdSql.getResult(sql)
limited_rows = tdSql.getResult(limited_sql)
tdLog.info("all rows: %d, limited rows: %d" % (len(all_rows), len(limited_rows)))
if limit_str == ' limit ':
if limit + offset <= len(all_rows) and len(limited_rows) != limit:
tdLog.exit("limited sql has less rows than limit value which is not right, \
limit: %d, limited_rows: %d, all_rows: %d, offset: %d" % (limit, len(limited_rows), len(all_rows), offset))
elif limit + offset > len(all_rows) and offset < len(all_rows) and offset + len(limited_rows) != len(all_rows):
tdLog.exit("limited sql has less rows than all_rows which is not right, \
limit: %d, limited_rows: %d, all_rows: %d, offset: %d" % (limit, len(limited_rows), len(all_rows), offset))
elif offset >= len(all_rows) and len(limited_rows) != 0:
tdLog.exit("limited rows should be zero, \
limit: %d, limited_rows: %d, all_rows: %d, offset: %d" % (limit, len(limited_rows), len(all_rows), offset))
self.check_first_rows(all_rows, limited_rows, offset)
def test_interval_limit_asc(self, offset: int = 0):
sqls = ["select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from meters interval(1s) ",
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from meters interval(1m) ",
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from meters interval(1h) ",
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from meters interval(1d) ",
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from t1 interval(1s) ",
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from t1 interval(1m) ",
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from t1 interval(1h) ",
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from t1 interval(1d) "]
for sql in sqls:
self.query_and_check_with_limit(sql, 5000, 500, offset)
def test_interval_limit_desc(self, offset: int = 0):
sqls = ["select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from meters interval(1s) ",
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from meters interval(1m) ",
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from meters interval(1h) ",
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from meters interval(1d) ",
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from t1 interval(1s) ",
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from t1 interval(1m) ",
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from t1 interval(1h) ",
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from t1 interval(1d) "]
for sql in sqls:
self.query_and_check_with_limit(sql, 5000, 500, offset)
def test_interval_limit_offset(self):
for offset in range(0, 1000, 500):
self.test_interval_limit_asc(offset)
self.test_interval_limit_desc(offset)
self.test_interval_fill_limit(offset)
self.test_interval_order_by_limit(offset)
self.test_interval_partition_by_slimit(offset)
def test_interval_fill_limit(self, offset: int = 0):
sqls = [
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 09:30:00.000' interval(1s) fill(linear)",
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 09:30:00.000' interval(1m) fill(linear)",
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 09:30:00.000' interval(1h) fill(linear)",
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 09:30:00.000' interval(1d) fill(linear)"
]
for sql in sqls:
self.query_and_check_with_limit(sql, 5000, 1000, offset)
def test_interval_order_by_limit(self, offset: int = 0):
sqls = [
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by b",
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by a desc",
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), last(ts) from meters \
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by a desc",
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by count(*), sum(c1), a",
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by a, count(*), sum(c1)",
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by b",
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by a desc",
"select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts) from meters \
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by a desc",
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by count(*), sum(c1), a",
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by a, count(*), sum(c1)",
]
for sql in sqls:
self.query_and_check_with_limit(sql, 6000, 2000, offset)
def test_interval_partition_by_slimit(self, offset: int = 0):
sqls = [
"select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts) from meters "
"where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by t1 interval(1m)",
"select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts) from meters "
"where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by t1 interval(1h)",
"select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts) from meters "
"where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by c3 interval(1m)",
]
for sql in sqls:
self.query_and_check_with_slimit(sql, 10, 2, offset)
def test_interval_partition_by_slimit_limit(self):
sql = "select * from (select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts),c3 from meters " \
"where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by c3 interval(1m) slimit 10 limit 2) order by c3 asc"
tdSql.query(sql)
tdSql.checkRows(20)
tdSql.checkData(0, 4, 0)
tdSql.checkData(1, 4, 0)
tdSql.checkData(2, 4, 1)
tdSql.checkData(3, 4, 1)
tdSql.checkData(18, 4, 9)
tdSql.checkData(19, 4, 9)
sql = "select * from (select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts),c3 from meters " \
"where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by c3 interval(1m) slimit 2,2 limit 2) order by c3 asc"
tdSql.query(sql)
tdSql.checkRows(4)
tdSql.checkData(0, 4, 2)
tdSql.checkData(1, 4, 2)
tdSql.checkData(2, 4, 9)
tdSql.checkData(3, 4, 9)
def run(self):
self.prepareTestEnv()
self.test_interval_limit_offset()
self.test_interval_partition_by_slimit_limit()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())