Merge branch '3.0' of https://github.com/taosdata/TDengine into 3.0
This commit is contained in:
commit
30f3fa29b9
|
@ -202,7 +202,7 @@ SELECT ... FROM table_name1 LEFT|RIGHT ASOF JOIN table_name2 [ON ...] [JLIMIT jl
|
|||
|
||||
表 d1001 电压值大于 220V 且表 d1002 中同一时刻或稍早前最后时刻出现电压大于 220V 的时间及各自的电压值:
|
||||
```sql
|
||||
SELECT a.ts, a.voltage, a.ts, b.voltage FROM d1001 a LEFT ASOF JOIN d1002 b ON a.ts >= b.ts where a.voltage > 220 and b.voltage > 220
|
||||
SELECT a.ts, a.voltage, b.ts, b.voltage FROM d1001 a LEFT ASOF JOIN d1002 b ON a.ts >= b.ts where a.voltage > 220 and b.voltage > 220
|
||||
```
|
||||
|
||||
### Left/Right Window Join
|
||||
|
|
|
@ -2141,6 +2141,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
|||
taosWUnLockLatch(&tmq->lock);
|
||||
}
|
||||
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
||||
tmqFreeRspWrapper(pRspWrapper);
|
||||
taosFreeQitem(pRspWrapper);
|
||||
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
|
||||
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
||||
|
@ -2844,6 +2845,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
pWrapper->epoch = head->epoch;
|
||||
(void)memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
|
||||
if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg) == NULL){
|
||||
tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
|
||||
taosFreeQitem(pWrapper);
|
||||
}else{
|
||||
(void)taosWriteQitem(tmq->mqueue, pWrapper);
|
||||
|
|
|
@ -10140,6 +10140,7 @@ void *tDecodeMqSubTopicEp(void *buf, SMqSubTopicEp *pTopicEp) {
|
|||
buf = tDecodeSMqSubVgEp(buf, &vgEp);
|
||||
if (taosArrayPush(pTopicEp->vgs, &vgEp) == NULL) {
|
||||
taosArrayDestroy(pTopicEp->vgs);
|
||||
pTopicEp->vgs = NULL;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -293,7 +293,7 @@ int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
|
|||
(void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to create notify thread since %s", strerror(code));
|
||||
dError("failed to create notify thread since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -443,7 +443,7 @@ static int32_t mndInitTimer(SMnode *pMnode) {
|
|||
(void)taosThreadAttrInit(&thAttr);
|
||||
(void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
if ((code = taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode)) != 0) {
|
||||
mError("failed to create timer thread since %s", strerror(errno));
|
||||
mError("failed to create timer thread since %s", tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
|
|
|
@ -2420,7 +2420,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
|||
if (pStream != NULL) { // TODO:handle error
|
||||
code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
|
||||
if (code) {
|
||||
mError("failed to create checkpoint trans, code:%s", strerror(code));
|
||||
mError("failed to create checkpoint trans, code:%s", tstrerror(code));
|
||||
}
|
||||
} else {
|
||||
// todo: wait for the create stream trans completed, and launch the checkpoint trans
|
||||
|
|
|
@ -1069,6 +1069,9 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
|
|||
|
||||
while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
|
||||
SChkptReportInfo* px = (SChkptReportInfo *)pIter;
|
||||
if (taosArrayGetSize(px->pTaskList) == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0);
|
||||
if (pInfo == NULL) {
|
||||
|
|
|
@ -417,7 +417,6 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
return code;
|
||||
} else {
|
||||
tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
|
||||
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4708,8 +4708,7 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
|
|||
pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash);
|
||||
if (pReader->pSchemaMap == NULL) {
|
||||
tsdbError("failed init schema hash for reader %s", pReader->idStr);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
TSDB_CHECK_NULL(pReader->pSchemaMap, code, lino, _err, terrno);
|
||||
}
|
||||
|
||||
tSimpleHashSetFreeFp(pReader->pSchemaMap, freeSchemaFunc);
|
||||
|
|
|
@ -390,11 +390,13 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
|
|||
}
|
||||
|
||||
initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
|
||||
pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);
|
||||
if (pInfo->self < 0) {
|
||||
int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
|
||||
if (refId < 0) {
|
||||
int32_t code = terrno;
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
return code;
|
||||
} else {
|
||||
pInfo->self = refId;
|
||||
}
|
||||
|
||||
return initDataSource(numOfSources, pInfo, id);
|
||||
|
|
|
@ -758,7 +758,10 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn
|
|||
|
||||
SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index);
|
||||
QUERY_CHECK_NULL(pv, code, lino, _end, terrno);
|
||||
nodesValueNodeToVariant(pv, &pFillCol[i].fillVal);
|
||||
code = nodesValueNodeToVariant(pv, &pFillCol[i].fillVal);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
pFillCol->numOfFillExpr = numOfFillExpr;
|
||||
|
|
|
@ -247,7 +247,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
|
|||
|
||||
} else {
|
||||
stError("failed to start stream backend at %s, reason: %s, restart from default state dir:%s", chkp,
|
||||
tstrerror(TAOS_SYSTEM_ERROR(errno)), state);
|
||||
tstrerror(terrno), state);
|
||||
code = taosMkDir(state);
|
||||
if (code != 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
|
|
|
@ -558,10 +558,17 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
|||
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
|
||||
pInfo->checkpointTime, pReq->checkpointTs);
|
||||
} else { // not in restore status, must be in checkpoint status
|
||||
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
|
||||
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
|
||||
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
|
||||
pInfo->checkpointTime, pReq->checkpointTs);
|
||||
if (pStatus.state == TASK_STATUS__CK) {
|
||||
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
|
||||
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
|
||||
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
|
||||
pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs);
|
||||
} else {
|
||||
stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
|
||||
" checkpointVer:%" PRId64 "->%" PRId64,
|
||||
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
|
||||
pReq->checkpointVer);
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
|
||||
|
@ -573,12 +580,11 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
|||
pInfo->checkpointVer = pReq->checkpointVer;
|
||||
pInfo->checkpointTime = pReq->checkpointTs;
|
||||
|
||||
streamTaskClearCheckInfo(pTask, true);
|
||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||
} else {
|
||||
stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus.name);
|
||||
}
|
||||
|
||||
streamTaskClearCheckInfo(pTask, true);
|
||||
|
||||
if (pReq->dropRelHTask) {
|
||||
stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
|
||||
pReq->taskId, vgId, pReq->hTaskId);
|
||||
|
|
|
@ -1083,7 +1083,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
|
|||
|
||||
int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
|
||||
void* buf = NULL;
|
||||
int32_t code = -1;
|
||||
int32_t code = 0;
|
||||
SRpcMsg msg = {0};
|
||||
|
||||
// serialize
|
||||
|
@ -1093,9 +1093,9 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in
|
|||
goto FAIL;
|
||||
}
|
||||
|
||||
code = -1;
|
||||
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
||||
if (buf == NULL) {
|
||||
code = terrno;
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -1119,6 +1119,10 @@ FAIL:
|
|||
rpcFreeCont(buf);
|
||||
}
|
||||
|
||||
if (code == -1) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -295,12 +295,12 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
|
|||
if (code) {
|
||||
stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, tstrerror(code));
|
||||
}
|
||||
|
||||
streamMetaRUnLock(pMeta);
|
||||
|
||||
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
|
||||
"meta-hb-tmr");
|
||||
|
||||
code = taosReleaseRef(streamMetaId, rid);
|
||||
|
||||
if (code) {
|
||||
stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid);
|
||||
}
|
||||
|
|
|
@ -107,7 +107,7 @@ void streamTaskResumeHelper(void* param, void* tmrId) {
|
|||
int32_t code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK);
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
if (code) {
|
||||
stError("s-task:%s sched task failed, code:%s, ref:%d", pId->idStr, strerror(code), ref);
|
||||
stError("s-task:%s sched task failed, code:%s, ref:%d", pId->idStr, tstrerror(code), ref);
|
||||
} else {
|
||||
stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime,
|
||||
ref);
|
||||
|
|
|
@ -1287,7 +1287,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
|||
}
|
||||
|
||||
// tools
|
||||
(void)syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr); // TODO: check return value
|
||||
(void)syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr); // TODO: check return value
|
||||
if (pSyncNode->pSyncRespMgr == NULL) {
|
||||
sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
|
||||
goto _error;
|
||||
|
@ -1407,7 +1407,8 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
|
|||
pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
|
||||
sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
|
||||
|
||||
if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE && (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex)) < 0) {
|
||||
if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
|
||||
(code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex)) < 0) {
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
|
@ -2187,7 +2188,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||
ASSERT(lastIndex >= 0);
|
||||
// ASSERT(lastIndex >= 0);
|
||||
sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
|
||||
raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
|
||||
}
|
||||
|
|
|
@ -892,7 +892,7 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
|
|||
|
||||
if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
|
||||
code = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1, &term);
|
||||
if (term < 0 && (errno == ENFILE || errno == EMFILE)) {
|
||||
if (term < 0 && (errno == ENFILE || errno == EMFILE || errno == ENOENT)) {
|
||||
sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64, pNode->vgId, tstrerror(code), index + 1);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ class TDTestCase(TBase):
|
|||
"enableCoreFile 1",
|
||||
"fqdn 127.0.0.1",
|
||||
"firstEp 127.0.0.1",
|
||||
"locale ENG",
|
||||
"locale en_US.UTF-8",
|
||||
"metaCacheMaxSize 10000",
|
||||
"minimalTmpDirGB 5",
|
||||
"minimalLogDirGB 1",
|
||||
|
|
|
@ -27,9 +27,9 @@
|
|||
,,y,army,./pytest.sh python3 ./test.py -f insert/insert_basic.py -N 3
|
||||
,,y,army,./pytest.sh python3 ./test.py -f cluster/splitVgroupByLearner.py -N 3
|
||||
,,y,army,./pytest.sh python3 ./test.py -f authorith/authBasic.py -N 3
|
||||
# ,,n,army,python3 ./test.py -f cmdline/fullopt.py
|
||||
,,n,army,python3 ./test.py -f query/show.py -N 3
|
||||
,,n,army,python3 ./test.py -f alter/alterConfig.py -N 3
|
||||
,,n,army,python3 ./test.py -f cmdline/fullopt.py
|
||||
,,y,army,./pytest.sh python3 ./test.py -f query/show.py -N 3
|
||||
,,y,army,./pytest.sh python3 ./test.py -f alter/alterConfig.py -N 3
|
||||
,,y,army,./pytest.sh python3 ./test.py -f query/subquery/subqueryBugs.py -N 3
|
||||
,,y,army,./pytest.sh python3 ./test.py -f storage/oneStageComp.py -N 3 -L 3 -D 1
|
||||
,,y,army,./pytest.sh python3 ./test.py -f storage/compressBasic.py -N 3
|
||||
|
@ -163,6 +163,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/stt_blocks_check.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/agg_null.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -R
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 2
|
||||
|
@ -280,8 +281,8 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -n 3 -i True
|
||||
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-db-removewal.py -N 2 -n 1
|
||||
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb-removewal.py -N 6 -n 3
|
||||
#,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb.py -N 2 -n 1
|
||||
#,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb.py -N 6 -n 3
|
||||
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb.py -N 2 -n 1
|
||||
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb.py -N 6 -n 3
|
||||
#,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-db.py -N 6 -n 3
|
||||
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select.py -N 2 -n 1
|
||||
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py -N 3 -n 3
|
||||
|
|
|
@ -9,3 +9,4 @@ requests
|
|||
pexpect
|
||||
faker
|
||||
pyopenssl
|
||||
hyperloglog
|
|
@ -0,0 +1,136 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import numpy as np
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
from util.common import *
|
||||
from util.sqlset import *
|
||||
from hyperloglog import HyperLogLog
|
||||
'''
|
||||
Test case for TS-5150
|
||||
'''
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor())
|
||||
self.ts = 1537146000000
|
||||
def initdabase(self):
|
||||
tdSql.execute('create database if not exists db_test vgroups 2 buffer 10')
|
||||
tdSql.execute('use db_test')
|
||||
tdSql.execute('create stable stb(ts timestamp, delay int) tags(groupid int)')
|
||||
tdSql.execute('create table t1 using stb tags(1)')
|
||||
tdSql.execute('create table t2 using stb tags(2)')
|
||||
tdSql.execute('create table t3 using stb tags(3)')
|
||||
tdSql.execute('create table t4 using stb tags(4)')
|
||||
tdSql.execute('create table t5 using stb tags(5)')
|
||||
tdSql.execute('create table t6 using stb tags(6)')
|
||||
def insert_data(self):
|
||||
for i in range(5000):
|
||||
tdSql.execute(f"insert into t1 values({self.ts + i * 1000}, {i%5})")
|
||||
tdSql.execute(f"insert into t2 values({self.ts + i * 1000}, {i%5})")
|
||||
tdSql.execute(f"insert into t3 values({self.ts + i * 1000}, {i%5})")
|
||||
|
||||
def verify_agg_null(self):
|
||||
for i in range(20):
|
||||
col_val_list = []
|
||||
tdSql.query(f'select CASE WHEN delay != 0 THEN delay ELSE NULL END from stb where ts between {1537146000000 + i * 1000} and {1537146000000 + (i+10) * 1000}')
|
||||
for col_va in tdSql.queryResult:
|
||||
if col_va[0] is not None:
|
||||
col_val_list.append(col_va[0])
|
||||
tdSql.query(f'SELECT APERCENTILE(CASE WHEN delay != 0 THEN delay ELSE NULL END,50) AS apercentile,\
|
||||
MAX(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS maxDelay,\
|
||||
MIN(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS minDelay,\
|
||||
AVG(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS avgDelay,\
|
||||
STDDEV(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS jitter,\
|
||||
COUNT(CASE WHEN delay = 0 THEN 1 ELSE NULL END) AS timeoutCount,\
|
||||
COUNT(*) AS totalCount ,\
|
||||
ELAPSED(ts) AS elapsed_time,\
|
||||
SPREAD(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS spread,\
|
||||
SUM(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS sum,\
|
||||
HYPERLOGLOG(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS hyperloglog from stb where ts between {1537146000000 + i * 1000} and {1537146000000 + (i+10) * 1000}')
|
||||
#verify apercentile
|
||||
apercentile_res = tdSql.queryResult[0][0]
|
||||
approximate_median = np.percentile(col_val_list, 50)
|
||||
assert np.abs(apercentile_res - approximate_median) < 1
|
||||
#verify max
|
||||
max_res = tdSql.queryResult[0][1]
|
||||
tdSql.checkEqual(max_res,max(col_val_list))
|
||||
#verify min
|
||||
min_res = tdSql.queryResult[0][2]
|
||||
tdSql.checkEqual(min_res,min(col_val_list))
|
||||
#verify avg
|
||||
avg_res = tdSql.queryResult[0][3]
|
||||
tdSql.checkEqual(avg_res,np.average(col_val_list))
|
||||
#verify stddev
|
||||
stddev_res = tdSql.queryResult[0][4]
|
||||
assert np.abs(stddev_res - np.std(col_val_list)) < 0.0001
|
||||
#verify count of 0 + count of !0 == count(*)
|
||||
count_res = tdSql.queryResult[0][6]
|
||||
tdSql.checkEqual(count_res,len(col_val_list)+tdSql.queryResult[0][5])
|
||||
#verify elapsed
|
||||
elapsed_res = tdSql.queryResult[0][7]
|
||||
assert elapsed_res == 10000
|
||||
#verify spread
|
||||
spread_res = tdSql.queryResult[0][8]
|
||||
tdSql.checkEqual(spread_res,max(col_val_list) - min(col_val_list))
|
||||
#verify sum
|
||||
sum_res = tdSql.queryResult[0][9]
|
||||
tdSql.checkEqual(sum_res,sum(col_val_list))
|
||||
#verify hyperloglog
|
||||
error_rate = 0.01
|
||||
hll = HyperLogLog(error_rate)
|
||||
for col_val in col_val_list:
|
||||
hll.add(col_val)
|
||||
hll_res = tdSql.queryResult[0][10]
|
||||
assert np.abs(hll_res - hll.card()) < 0.01
|
||||
#verify leastsquares
|
||||
tdSql.query(f'SELECT leastsquares(CASE WHEN delay != 0 THEN delay ELSE NULL END,1,1) from stb where ts between {1537146000000 + i * 1000} and {1537146000000 + (i+10) * 1000}')
|
||||
cleaned_data = tdSql.queryResult[0][0].strip('{}').replace(' ', '')
|
||||
pairs = cleaned_data.split(',')
|
||||
slope = None
|
||||
intercept = None
|
||||
for pair in pairs:
|
||||
key, value = pair.split(':')
|
||||
key = key.strip()
|
||||
value = float(value.strip())
|
||||
if key == 'slop':
|
||||
slope = value
|
||||
elif key == 'intercept':
|
||||
intercept = value
|
||||
assert slope != 0
|
||||
assert intercept != 0
|
||||
#verify histogram
|
||||
tdSql.query(f'SELECT histogram(CASE WHEN delay != 0 THEN delay ELSE NULL END, "user_input", "[1,3,5,7]", 1) from stb where ts between {1537146000000 + i * 1000} and {1537146000000 + (i+10) * 1000}')
|
||||
cleaned_data = tdSql.queryResult[0][0].strip('{}').replace(' ', '')
|
||||
pairs = cleaned_data.split(',')
|
||||
count = None
|
||||
for pair in pairs:
|
||||
key, value = pair.split(':')
|
||||
key = key.strip()
|
||||
if key == 'count':
|
||||
count = float(value.strip())
|
||||
assert count != 0
|
||||
def run(self):
|
||||
self.initdabase()
|
||||
self.insert_data()
|
||||
self.verify_agg_null()
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue