Merge branch '3.0' into feature/TD-11274-3.0
This commit is contained in:
commit
1e47de0523
|
@ -1033,6 +1033,7 @@ typedef struct {
|
|||
|
||||
// for tsma
|
||||
int8_t isTsma;
|
||||
void* pTsma;
|
||||
|
||||
} SCreateVnodeReq;
|
||||
|
||||
|
|
|
@ -343,7 +343,7 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF
|
||||
|
||||
#define TSDB_MIN_ROLLUP_FILE_FACTOR 0
|
||||
#define TSDB_MAX_ROLLUP_FILE_FACTOR 1
|
||||
#define TSDB_MAX_ROLLUP_FILE_FACTOR 10
|
||||
#define TSDB_DEFAULT_ROLLUP_FILE_FACTOR 0.1
|
||||
#define TSDB_MIN_TABLE_TTL 0
|
||||
#define TSDB_DEFAULT_TABLE_TTL 0
|
||||
|
|
|
@ -378,14 +378,16 @@ static FORCE_INLINE int32_t tDecodeDouble(SDecoder* pCoder, double* val) {
|
|||
}
|
||||
|
||||
static FORCE_INLINE int32_t tDecodeBinary(SDecoder* pCoder, uint8_t** val, uint32_t* len) {
|
||||
if (tDecodeU32v(pCoder, len) < 0) return -1;
|
||||
uint32_t length = 0;
|
||||
if (tDecodeU32v(pCoder, &length) < 0) return -1;
|
||||
if (len) *len = length;
|
||||
|
||||
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, *len)) return -1;
|
||||
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, length)) return -1;
|
||||
if (val) {
|
||||
*val = (uint8_t*)TD_CODER_CURRENT(pCoder);
|
||||
}
|
||||
|
||||
TD_CODER_MOVE_POS(pCoder, *len);
|
||||
TD_CODER_MOVE_POS(pCoder, length);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -410,14 +412,16 @@ static int32_t tDecodeCStrTo(SDecoder* pCoder, char* val) {
|
|||
}
|
||||
|
||||
static FORCE_INLINE int32_t tDecodeBinaryAlloc(SDecoder* pCoder, void** val, uint64_t* len) {
|
||||
if (tDecodeU64v(pCoder, len) < 0) return -1;
|
||||
uint64_t length = 0;
|
||||
if (tDecodeU64v(pCoder, &length) < 0) return -1;
|
||||
if (len) *len = length;
|
||||
|
||||
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, *len)) return -1;
|
||||
*val = taosMemoryMalloc(*len);
|
||||
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, length)) return -1;
|
||||
*val = taosMemoryMalloc(length);
|
||||
if (*val == NULL) return -1;
|
||||
memcpy(*val, TD_CODER_CURRENT(pCoder), *len);
|
||||
memcpy(*val, TD_CODER_CURRENT(pCoder), length);
|
||||
|
||||
TD_CODER_MOVE_POS(pCoder, *len);
|
||||
TD_CODER_MOVE_POS(pCoder, length);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -2973,6 +2973,11 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
|
|||
}
|
||||
|
||||
if (tEncodeI8(&encoder, pReq->isTsma) < 0) return -1;
|
||||
if (pReq->isTsma) {
|
||||
uint32_t tsmaLen = (uint32_t)(htonl(((SMsgHead *)pReq->pTsma)->contLen));
|
||||
if (tEncodeBinary(&encoder, (const uint8_t *)pReq->pTsma, tsmaLen) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -3036,6 +3041,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
|
|||
}
|
||||
|
||||
if (tDecodeI8(&decoder, &pReq->isTsma) < 0) return -1;
|
||||
if (pReq->isTsma) {
|
||||
if (tDecodeBinaryAlloc(&decoder, &pReq->pTsma, NULL) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -3045,6 +3053,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
|
|||
int32_t tFreeSCreateVnodeReq(SCreateVnodeReq *pReq) {
|
||||
taosArrayDestroy(pReq->pRetensions);
|
||||
pReq->pRetensions = NULL;
|
||||
if(pReq->isTsma) {
|
||||
taosMemoryFreeClear(pReq->pTsma);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -140,6 +140,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
|||
pCfg->szCache = pCreate->pages;
|
||||
pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
|
||||
pCfg->isWeak = true;
|
||||
pCfg->isTsma = pCreate->isTsma;
|
||||
pCfg->tsdbCfg.compression = pCreate->compression;
|
||||
pCfg->tsdbCfg.precision = pCreate->precision;
|
||||
pCfg->tsdbCfg.days = pCreate->daysPerFile;
|
||||
|
@ -209,7 +210,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
|
||||
if (pImpl == NULL) {
|
||||
dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
|
||||
dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr());
|
||||
code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
|
|
@ -344,6 +344,7 @@ typedef struct {
|
|||
int8_t isTsma;
|
||||
int8_t replica;
|
||||
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
|
||||
void* pTsma;
|
||||
} SVgObj;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -409,7 +409,8 @@ static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
|
||||
static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
|
||||
SSmaObj *pSma) {
|
||||
SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
||||
if (pDnode == NULL) return -1;
|
||||
|
@ -419,9 +420,14 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
|
|||
mndReleaseDnode(pMnode, pDnode);
|
||||
|
||||
// todo add sma info here
|
||||
int32_t smaContLen = 0;
|
||||
void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
|
||||
if (pSmaReq == NULL) return -1;
|
||||
pVgroup->pTsma = pSmaReq;
|
||||
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
|
||||
taosMemoryFreeClear(pSmaReq);
|
||||
if (pReq == NULL) return -1;
|
||||
|
||||
action.pCont = pReq;
|
||||
|
@ -514,7 +520,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
|
||||
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
||||
|
|
|
@ -218,6 +218,8 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
|
|||
createReq.hashMethod = pDb->cfg.hashMethod;
|
||||
createReq.numOfRetensions = pDb->cfg.numOfRetensions;
|
||||
createReq.pRetensions = pDb->cfg.pRetensions;
|
||||
createReq.isTsma = pVgroup->isTsma;
|
||||
createReq.pTsma = pVgroup->pTsma;
|
||||
|
||||
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||
SReplica *pReplica = &createReq.replicas[v];
|
||||
|
|
|
@ -171,12 +171,13 @@ struct SVnodeCfg {
|
|||
uint64_t szBuf;
|
||||
bool isHeap;
|
||||
bool isWeak;
|
||||
int8_t isTsma;
|
||||
int8_t hashMethod;
|
||||
STsdbCfg tsdbCfg;
|
||||
SWalCfg walCfg;
|
||||
SSyncCfg syncCfg;
|
||||
uint32_t hashBegin;
|
||||
uint32_t hashEnd;
|
||||
int8_t hashMethod;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -414,7 +414,7 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
|
|||
}
|
||||
taosMemoryFreeClear(pReq);
|
||||
} else {
|
||||
smaWarn("vgId:%d no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno));
|
||||
smaDebug("vgId:%d no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno));
|
||||
}
|
||||
|
||||
taosArrayDestroy(pResult);
|
||||
|
|
|
@ -56,6 +56,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
|||
if (tjsonAddIntegerToObject(pJson, "szBuf", pCfg->szBuf) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeap) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "isTsma", pCfg->isTsma) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
|
||||
|
@ -130,6 +131,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
|||
if(code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak, code);
|
||||
if(code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "isTsma", pCfg->isTsma, code);
|
||||
if(code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision, code);
|
||||
if(code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update, code);
|
||||
|
|
|
@ -97,7 +97,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
|||
}
|
||||
|
||||
// open tsdb
|
||||
if (!vnodeIsRollup(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, TSDB_TYPE_TSDB) < 0) {
|
||||
if (!vnodeIsRollup(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL) < 0) {
|
||||
vError("vgId:%d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
|
|
@ -1754,6 +1754,11 @@ int32_t leastSQRFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_NULL: {
|
||||
GET_RES_INFO(pCtx)->isNullRes = 1;
|
||||
numOfElem = 1;
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
|
@ -1797,7 +1802,7 @@ int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
size_t len = snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%.6lf, intercept:%.6lf}", param[0][2], param[1][2]);
|
||||
varDataSetLen(buf, len);
|
||||
|
||||
colDataAppend(pCol, currentRow, buf, false);
|
||||
colDataAppend(pCol, currentRow, buf, pResInfo->isNullRes);
|
||||
|
||||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ typedef struct MemTable {
|
|||
typedef struct IndexCache {
|
||||
T_REF_DECLARE()
|
||||
MemTable *mem, *imm;
|
||||
int32_t merging;
|
||||
SIndex* index;
|
||||
char* colName;
|
||||
int64_t version;
|
||||
|
|
|
@ -462,7 +462,10 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
|
|||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
IndexCache* pCache = (IndexCache*)cache;
|
||||
IndexCache* pCache = (IndexCache*)cache;
|
||||
|
||||
while (sIdx->quit && atomic_load_32(&pCache->merging) == 1) {
|
||||
}
|
||||
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
|
||||
if (pReader == NULL) {
|
||||
indexWarn("empty tfile reader found");
|
||||
|
@ -475,9 +478,9 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
|
|||
tfileReaderUnRef(pReader);
|
||||
if (sIdx->quit) {
|
||||
indexPost(sIdx);
|
||||
// indexCacheBroadcast(pCache);
|
||||
}
|
||||
indexReleaseRef(sIdx->refId);
|
||||
atomic_store_32(&pCache->merging, 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -539,6 +542,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
|
|||
if (sIdx->quit) {
|
||||
indexPost(sIdx);
|
||||
}
|
||||
atomic_store_32(&pCache->merging, 0);
|
||||
indexReleaseRef(sIdx->refId);
|
||||
|
||||
return ret;
|
||||
|
@ -605,6 +609,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
|||
taosThreadMutexLock(&tf->mtx);
|
||||
tfileCachePut(tf->cache, &key, reader);
|
||||
taosThreadMutexUnlock(&tf->mtx);
|
||||
|
||||
return ret;
|
||||
END:
|
||||
if (tw != NULL) {
|
||||
|
|
|
@ -494,16 +494,19 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
|
|||
// TODO: wake up by condition variable
|
||||
indexCacheWait(cache);
|
||||
} else {
|
||||
bool notifyQuit = cache->occupiedMem >= MEM_SIGNAL_QUIT ? true : false;
|
||||
bool quit = cache->occupiedMem >= MEM_SIGNAL_QUIT ? true : false;
|
||||
|
||||
indexCacheRef(cache);
|
||||
cache->imm = cache->mem;
|
||||
cache->mem = indexInternalCacheCreate(cache->type);
|
||||
cache->mem->pCache = cache;
|
||||
cache->occupiedMem = 0;
|
||||
if (quit == false) {
|
||||
atomic_store_32(&cache->merging, 1);
|
||||
}
|
||||
// sched to merge
|
||||
// unref cache in bgwork
|
||||
indexCacheSchedToMerge(cache, notifyQuit);
|
||||
indexCacheSchedToMerge(cache, quit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,431 @@
|
|||
from math import floor
|
||||
from random import randint, random
|
||||
from numpy import equal
|
||||
import taos
|
||||
import sys
|
||||
import datetime
|
||||
import inspect
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
|
||||
class TDTestCase:
|
||||
updatecfgDict = {'debugFlag': 143 ,"cDebugFlag":143,"uDebugFlag":143 ,"rpcDebugFlag":143 , "tmrDebugFlag":143 ,
|
||||
"jniDebugFlag":143 ,"simDebugFlag":143,"dDebugFlag":143, "dDebugFlag":143,"vDebugFlag":143,"mDebugFlag":143,"qDebugFlag":143,
|
||||
"wDebugFlag":143,"sDebugFlag":143,"tsdbDebugFlag":143,"tqDebugFlag":143 ,"fsDebugFlag":143 ,"fnDebugFlag":143}
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
|
||||
def prepare_datas(self):
|
||||
tdSql.execute(
|
||||
'''create table stb1
|
||||
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
|
||||
tags (t1 int)
|
||||
'''
|
||||
)
|
||||
|
||||
tdSql.execute(
|
||||
'''
|
||||
create table t1
|
||||
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
|
||||
'''
|
||||
)
|
||||
for i in range(4):
|
||||
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
|
||||
|
||||
for i in range(9):
|
||||
tdSql.execute(
|
||||
f"insert into ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
|
||||
)
|
||||
tdSql.execute(
|
||||
f"insert into ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
|
||||
)
|
||||
tdSql.execute("insert into ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )")
|
||||
tdSql.execute("insert into ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
|
||||
tdSql.execute("insert into ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )")
|
||||
tdSql.execute("insert into ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
|
||||
|
||||
tdSql.execute("insert into ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
|
||||
tdSql.execute("insert into ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
|
||||
tdSql.execute("insert into ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
|
||||
|
||||
tdSql.execute(
|
||||
f'''insert into t1 values
|
||||
( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
|
||||
( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a )
|
||||
( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
|
||||
( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a )
|
||||
( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a )
|
||||
( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
|
||||
( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a )
|
||||
( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
|
||||
( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
|
||||
( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
|
||||
( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
|
||||
( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
|
||||
'''
|
||||
)
|
||||
|
||||
def test_errors(self):
|
||||
error_sql_lists = [
|
||||
# "select stateduration(c1,'GT',5,1s) from t1"
|
||||
"select stateduration from t1",
|
||||
"select stateduration(123--123)==1 from t1",
|
||||
"select stateduration(123,123) from t1",
|
||||
"select stateduration(c1,ts) from t1",
|
||||
"select stateduration(c1,c1,ts) from t1",
|
||||
"select stateduration(c1 ,c2 ) from t1",
|
||||
"select stateduration(c1 ,NULL) from t1",
|
||||
#"select stateduration(c1 ,'NULL',1.0,1s) from t1",
|
||||
"select stateduration(c1 ,'GT','1',1s) from t1",
|
||||
"select stateduration(c1 ,'GT','tbname',1s) from t1",
|
||||
"select stateduration(c1 ,'GT','*',1s) from t1",
|
||||
"select stateduration(c1 ,'GT',ts,1s) from t1",
|
||||
"select stateduration(c1 ,'GT',max(c1),1s) from t1",
|
||||
"select stateduration(abs(c1) ,'GT',1,1s) from t1",
|
||||
"select stateduration(c1+2 ,'GT',1,1s) from t1",
|
||||
"select stateduration(c1 ,'GT',1,1u) from t1",
|
||||
"select stateduration(c1 ,'GT',1,now) from t1",
|
||||
"select stateduration(c1 ,'GT','1',1s) from t1",
|
||||
"select stateduration(c1 ,'GT','1',True) from t1",
|
||||
"select stateduration(stateduration(c1) ab from t1)",
|
||||
"select stateduration(c1 ,'GT',1,,)int from t1",
|
||||
"select stateduration('c1','GT',1) from t1",
|
||||
"select stateduration('c1','GT', 1 , NULL) from t1",
|
||||
"select stateduration('c1','GT', 1 , '') from t1",
|
||||
"select stateduration('c1','GT', 1 ,c%) from t1",
|
||||
"select stateduration(c1 ,'GT',1,t1) from t1",
|
||||
"select stateduration(c1 ,'GT',1,True) from t1",
|
||||
"select stateduration(c1 ,'GT',1,1s) , count(c1) from t1",
|
||||
"select stateduration(c1 ,'GT',1,1s) , avg(c1) from t1",
|
||||
"select stateduration(c1 ,'GT',1,1s) , min(c1) from t1",
|
||||
"select stateduration(c1 ,'GT',1,1s) , spread(c1) from t1",
|
||||
"select stateduration(c1 ,'GT',1,1s) , diff(c1) from t1",
|
||||
"select stateduration(c1 ,'GT',1,1s) , abs(c1) from t1",
|
||||
"select stateduration(c1 ,'GT',1,1s) , c1 from t1",
|
||||
]
|
||||
for error_sql in error_sql_lists:
|
||||
tdSql.error(error_sql)
|
||||
pass
|
||||
|
||||
def support_types(self):
|
||||
other_no_value_types = [
|
||||
"select stateduration(ts,'GT',1,1s) from t1" ,
|
||||
"select stateduration(c7,'GT',1,1s) from t1",
|
||||
"select stateduration(c8,'GT',1,1s) from t1",
|
||||
"select stateduration(c9,'GT',1,1s) from t1",
|
||||
"select stateduration(ts,'GT',1,1s) from ct1" ,
|
||||
"select stateduration(c7,'GT',1,1s) from ct1",
|
||||
"select stateduration(c8,'GT',1,1s) from ct1",
|
||||
"select stateduration(c9,'GT',1,1s) from ct1",
|
||||
"select stateduration(ts,'GT',1,1s) from ct3" ,
|
||||
"select stateduration(c7,'GT',1,1s) from ct3",
|
||||
"select stateduration(c8,'GT',1,1s) from ct3",
|
||||
"select stateduration(c9,'GT',1,1s) from ct3",
|
||||
"select stateduration(ts,'GT',1,1s) from ct4" ,
|
||||
"select stateduration(c7,'GT',1,1s) from ct4",
|
||||
"select stateduration(c8,'GT',1,1s) from ct4",
|
||||
"select stateduration(c9,'GT',1,1s) from ct4",
|
||||
"select stateduration(ts,'GT',1,1s) from stb1 partition by tbname" ,
|
||||
"select stateduration(c7,'GT',1,1s) from stb1 partition by tbname",
|
||||
"select stateduration(c8,'GT',1,1s) from stb1 partition by tbname",
|
||||
"select stateduration(c9,'GT',1,1s) from stb1 partition by tbname"
|
||||
]
|
||||
|
||||
for type_sql in other_no_value_types:
|
||||
tdSql.error(type_sql)
|
||||
tdLog.info("support type ok , sql is : %s"%type_sql)
|
||||
|
||||
type_sql_lists = [
|
||||
"select stateduration(c1,'GT',1,1s) from t1",
|
||||
"select stateduration(c2,'GT',1,1s) from t1",
|
||||
"select stateduration(c3,'GT',1,1s) from t1",
|
||||
"select stateduration(c4,'GT',1,1s) from t1",
|
||||
"select stateduration(c5,'GT',1,1s) from t1",
|
||||
"select stateduration(c6,'GT',1,1s) from t1",
|
||||
|
||||
"select stateduration(c1,'GT',1,1s) from ct1",
|
||||
"select stateduration(c2,'GT',1,1s) from ct1",
|
||||
"select stateduration(c3,'GT',1,1s) from ct1",
|
||||
"select stateduration(c4,'GT',1,1s) from ct1",
|
||||
"select stateduration(c5,'GT',1,1s) from ct1",
|
||||
"select stateduration(c6,'GT',1,1s) from ct1",
|
||||
|
||||
"select stateduration(c1,'GT',1,1s) from ct3",
|
||||
"select stateduration(c2,'GT',1,1s) from ct3",
|
||||
"select stateduration(c3,'GT',1,1s) from ct3",
|
||||
"select stateduration(c4,'GT',1,1s) from ct3",
|
||||
"select stateduration(c5,'GT',1,1s) from ct3",
|
||||
"select stateduration(c6,'GT',1,1s) from ct3",
|
||||
|
||||
"select stateduration(c1,'GT',1,1s) from stb1 partition by tbname",
|
||||
"select stateduration(c2,'GT',1,1s) from stb1 partition by tbname",
|
||||
"select stateduration(c3,'GT',1,1s) from stb1 partition by tbname",
|
||||
"select stateduration(c4,'GT',1,1s) from stb1 partition by tbname",
|
||||
"select stateduration(c5,'GT',1,1s) from stb1 partition by tbname",
|
||||
"select stateduration(c6,'GT',1,1s) from stb1 partition by tbname",
|
||||
|
||||
"select stateduration(c6,'GT',1,1s) as alisb from stb1 partition by tbname",
|
||||
"select stateduration(c6,'GT',1,1s) alisb from stb1 partition by tbname",
|
||||
]
|
||||
|
||||
for type_sql in type_sql_lists:
|
||||
tdSql.query(type_sql)
|
||||
|
||||
def support_opers(self):
|
||||
oper_lists = ['LT','lt','Lt','lT','GT','gt','Gt','gT','LE','le','Le','lE','GE','ge','Ge','gE','NE','ne','Ne','nE','EQ','eq','Eq','eQ']
|
||||
|
||||
oper_errors = [",","*","NULL","tbname","ts","sum","_c0"]
|
||||
|
||||
for oper in oper_lists:
|
||||
tdSql.query(f"select stateduration(c1 ,'{oper}',1,1s) as col from t1")
|
||||
tdSql.checkRows(12)
|
||||
|
||||
for oper in oper_errors:
|
||||
tdSql.error(f"select stateduration(c1 ,'{oper}',1,1s) as col from t1")
|
||||
|
||||
|
||||
def basic_stateduration_function(self):
|
||||
|
||||
# basic query
|
||||
tdSql.query("select c1 from ct3")
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query("select c1 from t1")
|
||||
tdSql.checkRows(12)
|
||||
tdSql.query("select c1 from stb1")
|
||||
tdSql.checkRows(25)
|
||||
|
||||
# used for empty table , ct3 is empty
|
||||
tdSql.query("select stateduration(c6,'GT',1,1s) from ct3")
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query("select stateduration(c6,'GT',1,1s) from ct3")
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query("select stateduration(c6,'GT',1,1s) from ct3")
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query("select stateduration(c6,'GT',1,1s) from ct3")
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query("select stateduration(c6,'GT',1,1s) from ct3")
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query("select stateduration(c6,'GT',1,1s) from ct3")
|
||||
|
||||
# will support _rowts mix with
|
||||
# tdSql.query("select (c6,'GT',1,1s),_rowts from ct3")
|
||||
|
||||
# auto check for t1 table
|
||||
# used for regular table
|
||||
tdSql.query("select stateduration(c6,'GT',1,1s) from t1")
|
||||
|
||||
# unique with super tags
|
||||
|
||||
tdSql.query("select stateduration(c6,'GT',1,1s) from ct1")
|
||||
tdSql.checkRows(13)
|
||||
|
||||
tdSql.query("select stateduration(c6,'GT',1,1s) from ct4")
|
||||
tdSql.checkRows(12)
|
||||
|
||||
tdSql.error("select stateduration(c6,'GT',1,1s),tbname from ct1")
|
||||
tdSql.error("select stateduration(c6,'GT',1,1s),t1 from ct1")
|
||||
|
||||
# unique with common col
|
||||
tdSql.error("select stateduration(c6,'GT',1,1s) ,ts from ct1")
|
||||
tdSql.error("select stateduration(c6,'GT',1,1s) ,c1 from ct1")
|
||||
|
||||
# unique with scalar function
|
||||
tdSql.error("select stateduration(c6,'GT',1,1s) ,abs(c1) from ct1")
|
||||
tdSql.error("select stateduration(c6,'GT',1,1s) , unique(c2) from ct1")
|
||||
tdSql.error("select stateduration(c6,'GT',1,1s) , abs(c2)+2 from ct1")
|
||||
|
||||
|
||||
# unique with aggregate function
|
||||
tdSql.error("select stateduration(c6,'GT',1,1s) ,sum(c1) from ct1")
|
||||
tdSql.error("select stateduration(c6,'GT',1,1s) ,max(c1) from ct1")
|
||||
tdSql.error("select stateduration(c6,'GT',1,1s) ,csum(c1) from ct1")
|
||||
tdSql.error("select stateduration(c6,'GT',1,1s) ,count(c1) from ct1")
|
||||
|
||||
# unique with filter where
|
||||
tdSql.query("select stateduration(c6,'GT',1,1s) from ct4 where c1 is null")
|
||||
tdSql.checkData(0, 0, None)
|
||||
tdSql.checkData(1, 0, None)
|
||||
tdSql.checkData(2, 0, None)
|
||||
|
||||
tdSql.query("select stateduration(c1,'GT',1,1s) from t1 where c1 >2 ")
|
||||
tdSql.checkData(0, 0, 0)
|
||||
tdSql.checkData(1, 0, 10886404)
|
||||
tdSql.checkData(2, 0, 23500810)
|
||||
tdSql.checkData(4, 0, 57456020)
|
||||
tdSql.checkData(5, 0, 60393624)
|
||||
|
||||
tdSql.query("select stateduration(c2,'GT',1,1s) from t1 where c2 between 0 and 99999")
|
||||
tdSql.checkData(0, 0, 0)
|
||||
tdSql.checkData(1, 0, 6134400)
|
||||
tdSql.checkData(6, 0, -1)
|
||||
|
||||
|
||||
# unique with union all
|
||||
tdSql.query("select stateduration(c1,'GT',1,1s) from ct4 union all select stateduration(c1,'GT',1,1s) from ct1")
|
||||
tdSql.checkRows(25)
|
||||
tdSql.query("select stateduration(c1,'GT',1,1s) from ct4 union all select distinct(c1) from ct4")
|
||||
tdSql.checkRows(22)
|
||||
|
||||
# unique with join
|
||||
# prepare join datas with same ts
|
||||
|
||||
tdSql.execute(" use db ")
|
||||
tdSql.execute(" create stable st1 (ts timestamp , num int) tags(ind int)")
|
||||
tdSql.execute(" create table tb1 using st1 tags(1)")
|
||||
tdSql.execute(" create table tb2 using st1 tags(2)")
|
||||
|
||||
tdSql.execute(" create stable st2 (ts timestamp , num int) tags(ind int)")
|
||||
tdSql.execute(" create table ttb1 using st2 tags(1)")
|
||||
tdSql.execute(" create table ttb2 using st2 tags(2)")
|
||||
|
||||
start_ts = 1622369635000 # 2021-05-30 18:13:55
|
||||
|
||||
for i in range(10):
|
||||
ts_value = start_ts+i*1000
|
||||
tdSql.execute(f" insert into tb1 values({ts_value} , {i})")
|
||||
tdSql.execute(f" insert into tb2 values({ts_value} , {i})")
|
||||
|
||||
tdSql.execute(f" insert into ttb1 values({ts_value} , {i})")
|
||||
tdSql.execute(f" insert into ttb2 values({ts_value} , {i})")
|
||||
|
||||
tdSql.query("select stateduration(tb1.num,'GT',1,1s) from tb1, tb2 where tb1.ts=tb2.ts ")
|
||||
tdSql.checkRows(10)
|
||||
tdSql.checkData(0,0,-1)
|
||||
tdSql.checkData(1,0,-1)
|
||||
tdSql.checkData(2,0,0)
|
||||
tdSql.checkData(9,0,7)
|
||||
|
||||
tdSql.query("select stateduration(tb1.num,'GT',1,1s) from tb1, tb2 where tb1.ts=tb2.ts union all select stateduration(tb2.num,'GT',1,1s) from tb1, tb2 where tb1.ts=tb2.ts ")
|
||||
tdSql.checkRows(20)
|
||||
|
||||
# nest query
|
||||
# tdSql.query("select unique(c1) from (select c1 from ct1)")
|
||||
tdSql.query("select c1 from (select stateduration(c1,'GT',1,1s) c1 from t1)")
|
||||
tdSql.checkRows(12)
|
||||
tdSql.checkData(0, 0, None)
|
||||
tdSql.checkData(1, 0, -1)
|
||||
tdSql.checkData(2, 0, 0)
|
||||
tdSql.checkData(10, 0, 63072035)
|
||||
|
||||
tdSql.query("select sum(c1) from (select stateduration(c1,'GT',1,1d) c1 from t1)")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, 2893)
|
||||
|
||||
tdSql.query("select sum(c1) from (select distinct(c1) c1 from ct1) union all select sum(c1) from (select stateduration(c1,'GT',1,1s) c1 from ct1)")
|
||||
tdSql.checkRows(2)
|
||||
|
||||
tdSql.query("select 1-abs(c1) from (select stateduration(c1,'GT',1,1s) c1 from t1)")
|
||||
tdSql.checkRows(12)
|
||||
tdSql.checkData(0, 0, None)
|
||||
tdSql.checkData(1, 0, 0.000000000)
|
||||
tdSql.checkData(3, 0, -86404.000000000)
|
||||
|
||||
|
||||
# bug for stable
|
||||
#partition by tbname
|
||||
# tdSql.query(" select unique(c1) from stb1 partition by tbname ")
|
||||
# tdSql.checkRows(21)
|
||||
|
||||
# tdSql.query(" select unique(c1) from stb1 partition by tbname ")
|
||||
# tdSql.checkRows(21)
|
||||
|
||||
# group by
|
||||
tdSql.error("select stateduration(c1,'GT',1,1s) from ct1 group by c1")
|
||||
tdSql.error("select stateduration(c1,'GT',1,1s) from ct1 group by tbname")
|
||||
|
||||
# super table
|
||||
|
||||
def check_unit_time(self):
|
||||
tdSql.execute(" use db ")
|
||||
tdSql.error("select stateduration(c1,'GT',1,1b) from ct1")
|
||||
tdSql.error("select stateduration(c1,'GT',1,1u) from ct1")
|
||||
tdSql.query("select stateduration(c1,'GT',1,1s) from t1")
|
||||
tdSql.checkData(10,0,63072035)
|
||||
tdSql.query("select stateduration(c1,'GT',1,1000s) from t1")
|
||||
tdSql.checkData(10,0,int(63072035/1000))
|
||||
tdSql.query("select stateduration(c1,'GT',1,1m) from t1")
|
||||
tdSql.checkData(10,0,int(63072035/60))
|
||||
tdSql.query("select stateduration(c1,'GT',1,1h) from t1")
|
||||
tdSql.checkData(10,0,int(63072035/60/60))
|
||||
tdSql.query("select stateduration(c1,'GT',1,1d) from t1")
|
||||
tdSql.checkData(10,0,int(63072035/60/24/60))
|
||||
tdSql.query("select stateduration(c1,'GT',1,1w) from t1")
|
||||
tdSql.checkData(10,0,int(63072035/60/7/24/60))
|
||||
|
||||
|
||||
def check_boundary_values(self):
|
||||
|
||||
tdSql.execute("drop database if exists bound_test")
|
||||
tdSql.execute("create database if not exists bound_test")
|
||||
tdSql.execute("use bound_test")
|
||||
tdSql.execute(
|
||||
"create table stb_bound (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(32),c9 nchar(32), c10 timestamp) tags (t1 int);"
|
||||
)
|
||||
tdSql.execute(f'create table sub1_bound using stb_bound tags ( 1 )')
|
||||
tdSql.execute(
|
||||
f"insert into sub1_bound values ( now()-1s, 2147483647, 9223372036854775807, 32767, 127, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
|
||||
)
|
||||
tdSql.execute(
|
||||
f"insert into sub1_bound values ( now(), 2147483646, 9223372036854775806, 32766, 126, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
|
||||
)
|
||||
|
||||
tdSql.execute(
|
||||
f"insert into sub1_bound values ( now(), -2147483646, -9223372036854775806, -32766, -126, -3.40E+38, -1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
|
||||
)
|
||||
|
||||
tdSql.execute(
|
||||
f"insert into sub1_bound values ( now(), 2147483643, 9223372036854775803, 32763, 123, 3.39E+38, 1.69e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
|
||||
)
|
||||
|
||||
tdSql.execute(
|
||||
f"insert into sub1_bound values ( now(), -2147483643, -9223372036854775803, -32763, -123, -3.39E+38, -1.69e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
|
||||
)
|
||||
|
||||
tdSql.error(
|
||||
f"insert into sub1_bound values ( now()+1s, 2147483648, 9223372036854775808, 32768, 128, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
|
||||
)
|
||||
|
||||
tdSql.query("select stateduration(c1,'GT',1,1s) from sub1_bound")
|
||||
tdSql.checkRows(5)
|
||||
|
||||
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
|
||||
tdSql.prepare()
|
||||
|
||||
tdLog.printNoPrefix("==========step1:create table ==============")
|
||||
|
||||
self.prepare_datas()
|
||||
|
||||
tdLog.printNoPrefix("==========step2:test errors ==============")
|
||||
|
||||
self.test_errors()
|
||||
|
||||
tdLog.printNoPrefix("==========step3:support types ============")
|
||||
|
||||
self.support_types()
|
||||
|
||||
tdLog.printNoPrefix("==========step4:support opers ============")
|
||||
self.support_opers()
|
||||
|
||||
tdLog.printNoPrefix("==========step5: stateduration basic query ============")
|
||||
|
||||
self.basic_stateduration_function()
|
||||
|
||||
tdLog.printNoPrefix("==========step6: stateduration boundary query ============")
|
||||
|
||||
self.check_boundary_values()
|
||||
|
||||
tdLog.printNoPrefix("==========step6: stateduration unit time test ============")
|
||||
|
||||
self.check_unit_time()
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -0,0 +1,431 @@
|
|||
from math import floor
|
||||
from random import randint, random
|
||||
from numpy import equal
|
||||
import taos
|
||||
import sys
|
||||
import datetime
|
||||
import inspect
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
|
||||
class TDTestCase:
|
||||
updatecfgDict = {'debugFlag': 143 ,"cDebugFlag":143,"uDebugFlag":143 ,"rpcDebugFlag":143 , "tmrDebugFlag":143 ,
|
||||
"jniDebugFlag":143 ,"simDebugFlag":143,"dDebugFlag":143, "dDebugFlag":143,"vDebugFlag":143,"mDebugFlag":143,"qDebugFlag":143,
|
||||
"wDebugFlag":143,"sDebugFlag":143,"tsdbDebugFlag":143,"tqDebugFlag":143 ,"fsDebugFlag":143 ,"fnDebugFlag":143}
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
|
||||
def prepare_datas(self):
|
||||
tdSql.execute(
|
||||
'''create table stb1
|
||||
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
|
||||
tags (t1 int)
|
||||
'''
|
||||
)
|
||||
|
||||
tdSql.execute(
|
||||
'''
|
||||
create table t1
|
||||
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
|
||||
'''
|
||||
)
|
||||
for i in range(4):
|
||||
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
|
||||
|
||||
for i in range(9):
|
||||
tdSql.execute(
|
||||
f"insert into ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
|
||||
)
|
||||
tdSql.execute(
|
||||
f"insert into ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
|
||||
)
|
||||
tdSql.execute("insert into ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )")
|
||||
tdSql.execute("insert into ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
|
||||
tdSql.execute("insert into ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )")
|
||||
tdSql.execute("insert into ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
|
||||
|
||||
tdSql.execute("insert into ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
|
||||
tdSql.execute("insert into ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
|
||||
tdSql.execute("insert into ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
|
||||
|
||||
tdSql.execute(
|
||||
f'''insert into t1 values
|
||||
( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
|
||||
( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a )
|
||||
( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
|
||||
( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a )
|
||||
( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a )
|
||||
( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
|
||||
( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a )
|
||||
( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
|
||||
( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
|
||||
( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
|
||||
( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
|
||||
( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
|
||||
'''
|
||||
)
|
||||
|
||||
def test_errors(self):
|
||||
error_sql_lists = [
|
||||
# "select statecount(c1,'GT',5) from t1"
|
||||
"select statecount from t1",
|
||||
"select statecount(123--123)==1 from t1",
|
||||
"select statecount(123,123) from t1",
|
||||
"select statecount(c1,ts) from t1",
|
||||
"select statecount(c1,c1,ts) from t1",
|
||||
"select statecount(c1 ,c2 ) from t1",
|
||||
"select statecount(c1 ,NULL) from t1",
|
||||
#"select statecount(c1 ,'NULL',1.0) from t1",
|
||||
"select statecount(c1 ,'GT','1') from t1",
|
||||
"select statecount(c1 ,'GT','tbname') from t1",
|
||||
"select statecount(c1 ,'GT','*') from t1",
|
||||
"select statecount(c1 ,'GT',ts) from t1",
|
||||
"select statecount(c1 ,'GT',max(c1)) from t1",
|
||||
"select statecount(abs(c1) ,'GT',1) from t1",
|
||||
"select statecount(c1+2 ,'GT',1) from t1",
|
||||
"select statecount(c1 ,'GT',1,1u) from t1",
|
||||
"select statecount(c1 ,'GT',1,now) from t1",
|
||||
"select statecount(c1 ,'GT','1') from t1",
|
||||
"select statecount(c1 ,'GT','1',True) from t1",
|
||||
"select statecount(statecount(c1) ab from t1)",
|
||||
"select statecount(c1 ,'GT',1,,)int from t1",
|
||||
"select statecount('c1','GT',1) from t1",
|
||||
"select statecount('c1','GT' , NULL) from t1",
|
||||
"select statecount('c1','GT', 1 , '') from t1",
|
||||
"select statecount('c1','GT', 1 ,c%) from t1",
|
||||
"select statecount(c1 ,'GT',1,t1) from t1",
|
||||
"select statecount(c1 ,'GT',1,True) from t1",
|
||||
"select statecount(c1 ,'GT',1) , count(c1) from t1",
|
||||
"select statecount(c1 ,'GT',1) , avg(c1) from t1",
|
||||
"select statecount(c1 ,'GT',1) , min(c1) from t1",
|
||||
"select statecount(c1 ,'GT',1) , spread(c1) from t1",
|
||||
"select statecount(c1 ,'GT',1) , diff(c1) from t1",
|
||||
"select statecount(c1 ,'GT',1) , abs(c1) from t1",
|
||||
"select statecount(c1 ,'GT',1) , c1 from t1",
|
||||
]
|
||||
for error_sql in error_sql_lists:
|
||||
tdSql.error(error_sql)
|
||||
pass
|
||||
|
||||
def support_types(self):
|
||||
other_no_value_types = [
|
||||
"select statecount(ts,'GT',1) from t1" ,
|
||||
"select statecount(c7,'GT',1) from t1",
|
||||
"select statecount(c8,'GT',1) from t1",
|
||||
"select statecount(c9,'GT',1) from t1",
|
||||
"select statecount(ts,'GT',1) from ct1" ,
|
||||
"select statecount(c7,'GT',1) from ct1",
|
||||
"select statecount(c8,'GT',1) from ct1",
|
||||
"select statecount(c9,'GT',1) from ct1",
|
||||
"select statecount(ts,'GT',1) from ct3" ,
|
||||
"select statecount(c7,'GT',1) from ct3",
|
||||
"select statecount(c8,'GT',1) from ct3",
|
||||
"select statecount(c9,'GT',1) from ct3",
|
||||
"select statecount(ts,'GT',1) from ct4" ,
|
||||
"select statecount(c7,'GT',1) from ct4",
|
||||
"select statecount(c8,'GT',1) from ct4",
|
||||
"select statecount(c9,'GT',1) from ct4",
|
||||
"select statecount(ts,'GT',1) from stb1 partition by tbname" ,
|
||||
"select statecount(c7,'GT',1) from stb1 partition by tbname",
|
||||
"select statecount(c8,'GT',1) from stb1 partition by tbname",
|
||||
"select statecount(c9,'GT',1) from stb1 partition by tbname"
|
||||
]
|
||||
|
||||
for type_sql in other_no_value_types:
|
||||
tdSql.error(type_sql)
|
||||
tdLog.info("support type ok , sql is : %s"%type_sql)
|
||||
|
||||
type_sql_lists = [
|
||||
"select statecount(c1,'GT',1) from t1",
|
||||
"select statecount(c2,'GT',1) from t1",
|
||||
"select statecount(c3,'GT',1) from t1",
|
||||
"select statecount(c4,'GT',1) from t1",
|
||||
"select statecount(c5,'GT',1) from t1",
|
||||
"select statecount(c6,'GT',1) from t1",
|
||||
|
||||
"select statecount(c1,'GT',1) from ct1",
|
||||
"select statecount(c2,'GT',1) from ct1",
|
||||
"select statecount(c3,'GT',1) from ct1",
|
||||
"select statecount(c4,'GT',1) from ct1",
|
||||
"select statecount(c5,'GT',1) from ct1",
|
||||
"select statecount(c6,'GT',1) from ct1",
|
||||
|
||||
"select statecount(c1,'GT',1) from ct3",
|
||||
"select statecount(c2,'GT',1) from ct3",
|
||||
"select statecount(c3,'GT',1) from ct3",
|
||||
"select statecount(c4,'GT',1) from ct3",
|
||||
"select statecount(c5,'GT',1) from ct3",
|
||||
"select statecount(c6,'GT',1) from ct3",
|
||||
|
||||
"select statecount(c1,'GT',1) from stb1 partition by tbname",
|
||||
"select statecount(c2,'GT',1) from stb1 partition by tbname",
|
||||
"select statecount(c3,'GT',1) from stb1 partition by tbname",
|
||||
"select statecount(c4,'GT',1) from stb1 partition by tbname",
|
||||
"select statecount(c5,'GT',1) from stb1 partition by tbname",
|
||||
"select statecount(c6,'GT',1) from stb1 partition by tbname",
|
||||
|
||||
"select statecount(c6,'GT',1) as alisb from stb1 partition by tbname",
|
||||
"select statecount(c6,'GT',1) alisb from stb1 partition by tbname",
|
||||
]
|
||||
|
||||
for type_sql in type_sql_lists:
|
||||
tdSql.query(type_sql)
|
||||
|
||||
def support_opers(self):
|
||||
oper_lists = ['LT','lt','Lt','lT','GT','gt','Gt','gT','LE','le','Le','lE','GE','ge','Ge','gE','NE','ne','Ne','nE','EQ','eq','Eq','eQ']
|
||||
|
||||
oper_errors = [",","*","NULL","tbname","ts","sum","_c0"]
|
||||
|
||||
for oper in oper_lists:
|
||||
tdSql.query(f"select statecount(c1 ,'{oper}',1) as col from t1")
|
||||
tdSql.checkRows(12)
|
||||
|
||||
for oper in oper_errors:
|
||||
tdSql.error(f"select statecount(c1 ,'{oper}',1) as col from t1")
|
||||
|
||||
|
||||
def basic_statecount_function(self):
|
||||
|
||||
# basic query
|
||||
tdSql.query("select c1 from ct3")
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query("select c1 from t1")
|
||||
tdSql.checkRows(12)
|
||||
tdSql.query("select c1 from stb1")
|
||||
tdSql.checkRows(25)
|
||||
|
||||
# used for empty table , ct3 is empty
|
||||
tdSql.query("select statecount(c6,'GT',1) from ct3")
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query("select statecount(c6,'GT',1) from ct3")
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query("select statecount(c6,'GT',1) from ct3")
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query("select statecount(c6,'GT',1) from ct3")
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query("select statecount(c6,'GT',1) from ct3")
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query("select statecount(c6,'GT',1) from ct3")
|
||||
|
||||
# will support _rowts mix with
|
||||
# tdSql.query("select (c6,'GT',1),_rowts from ct3")
|
||||
|
||||
# auto check for t1 table
|
||||
# used for regular table
|
||||
tdSql.query("select statecount(c6,'GT',1) from t1")
|
||||
|
||||
# unique with super tags
|
||||
|
||||
tdSql.query("select statecount(c6,'GT',1) from ct1")
|
||||
tdSql.checkRows(13)
|
||||
|
||||
tdSql.query("select statecount(c6,'GT',1) from ct4")
|
||||
tdSql.checkRows(12)
|
||||
|
||||
tdSql.error("select statecount(c6,'GT',1),tbname from ct1")
|
||||
tdSql.error("select statecount(c6,'GT',1),t1 from ct1")
|
||||
|
||||
# unique with common col
|
||||
tdSql.error("select statecount(c6,'GT',1) ,ts from ct1")
|
||||
tdSql.error("select statecount(c6,'GT',1) ,c1 from ct1")
|
||||
|
||||
# unique with scalar function
|
||||
tdSql.error("select statecount(c6,'GT',1) ,abs(c1) from ct1")
|
||||
tdSql.error("select statecount(c6,'GT',1) , unique(c2) from ct1")
|
||||
tdSql.error("select statecount(c6,'GT',1) , abs(c2)+2 from ct1")
|
||||
|
||||
|
||||
# unique with aggregate function
|
||||
tdSql.error("select statecount(c6,'GT',1) ,sum(c1) from ct1")
|
||||
tdSql.error("select statecount(c6,'GT',1) ,max(c1) from ct1")
|
||||
tdSql.error("select statecount(c6,'GT',1) ,csum(c1) from ct1")
|
||||
tdSql.error("select statecount(c6,'GT',1) ,count(c1) from ct1")
|
||||
|
||||
# unique with filter where
|
||||
tdSql.query("select statecount(c6,'GT',1) from ct4 where c1 is null")
|
||||
tdSql.checkData(0, 0, None)
|
||||
tdSql.checkData(1, 0, None)
|
||||
tdSql.checkData(2, 0, None)
|
||||
|
||||
tdSql.query("select statecount(c1,'GT',1) from t1 where c1 >2 ")
|
||||
tdSql.checkData(0, 0, 1)
|
||||
tdSql.checkData(1, 0, 2)
|
||||
tdSql.checkData(2, 0, 3)
|
||||
tdSql.checkData(4, 0, 5)
|
||||
tdSql.checkData(5, 0, 6)
|
||||
|
||||
tdSql.query("select statecount(c2,'GT',1) from t1 where c2 between 0 and 99999")
|
||||
tdSql.checkData(0, 0, 1)
|
||||
tdSql.checkData(1, 0, 2)
|
||||
tdSql.checkData(6, 0, -1)
|
||||
|
||||
|
||||
# unique with union all
|
||||
tdSql.query("select statecount(c1,'GT',1) from ct4 union all select statecount(c1,'GT',1) from ct1")
|
||||
tdSql.checkRows(25)
|
||||
tdSql.query("select statecount(c1,'GT',1) from ct4 union all select distinct(c1) from ct4")
|
||||
tdSql.checkRows(22)
|
||||
|
||||
# unique with join
|
||||
# prepare join datas with same ts
|
||||
|
||||
tdSql.execute(" use db ")
|
||||
tdSql.execute(" create stable st1 (ts timestamp , num int) tags(ind int)")
|
||||
tdSql.execute(" create table tb1 using st1 tags(1)")
|
||||
tdSql.execute(" create table tb2 using st1 tags(2)")
|
||||
|
||||
tdSql.execute(" create stable st2 (ts timestamp , num int) tags(ind int)")
|
||||
tdSql.execute(" create table ttb1 using st2 tags(1)")
|
||||
tdSql.execute(" create table ttb2 using st2 tags(2)")
|
||||
|
||||
start_ts = 1622369635000 # 2021-05-30 18:13:55
|
||||
|
||||
for i in range(10):
|
||||
ts_value = start_ts+i*1000
|
||||
tdSql.execute(f" insert into tb1 values({ts_value} , {i})")
|
||||
tdSql.execute(f" insert into tb2 values({ts_value} , {i})")
|
||||
|
||||
tdSql.execute(f" insert into ttb1 values({ts_value} , {i})")
|
||||
tdSql.execute(f" insert into ttb2 values({ts_value} , {i})")
|
||||
|
||||
tdSql.query("select statecount(tb1.num,'GT',1) from tb1, tb2 where tb1.ts=tb2.ts ")
|
||||
tdSql.checkRows(10)
|
||||
tdSql.checkData(0,0,-1)
|
||||
tdSql.checkData(1,0,-1)
|
||||
tdSql.checkData(2,0,1)
|
||||
tdSql.checkData(9,0,8)
|
||||
|
||||
tdSql.query("select statecount(tb1.num,'GT',1) from tb1, tb2 where tb1.ts=tb2.ts union all select statecount(tb2.num,'GT',1) from tb1, tb2 where tb1.ts=tb2.ts ")
|
||||
tdSql.checkRows(20)
|
||||
|
||||
# nest query
|
||||
# tdSql.query("select unique(c1) from (select c1 from ct1)")
|
||||
tdSql.query("select c1 from (select statecount(c1,'GT',1) c1 from t1)")
|
||||
tdSql.checkRows(12)
|
||||
tdSql.checkData(0, 0, None)
|
||||
tdSql.checkData(1, 0, -1)
|
||||
tdSql.checkData(2, 0, 1)
|
||||
tdSql.checkData(10, 0, 8)
|
||||
|
||||
tdSql.query("select sum(c1) from (select statecount(c1,'GT',1) c1 from t1)")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, 35)
|
||||
|
||||
tdSql.query("select sum(c1) from (select distinct(c1) c1 from ct1) union all select sum(c1) from (select statecount(c1,'GT',1) c1 from ct1)")
|
||||
tdSql.checkRows(2)
|
||||
|
||||
tdSql.query("select 1-abs(c1) from (select statecount(c1,'GT',1) c1 from t1)")
|
||||
tdSql.checkRows(12)
|
||||
tdSql.checkData(0, 0, None)
|
||||
tdSql.checkData(1, 0, 0.000000000)
|
||||
tdSql.checkData(3, 0, -1.000000000)
|
||||
|
||||
|
||||
# bug for stable
|
||||
#partition by tbname
|
||||
# tdSql.query(" select unique(c1) from stb1 partition by tbname ")
|
||||
# tdSql.checkRows(21)
|
||||
|
||||
# tdSql.query(" select unique(c1) from stb1 partition by tbname ")
|
||||
# tdSql.checkRows(21)
|
||||
|
||||
# group by
|
||||
tdSql.query("select statecount(c1,'GT',1) from ct1 group by c1")
|
||||
tdSql.error("select statecount(c1,'GT',1) from ct1 group by tbname")
|
||||
|
||||
# super table
|
||||
|
||||
def check_unit_time(self):
|
||||
tdSql.execute(" use db ")
|
||||
tdSql.error("select stateduration(c1,'GT',1,1b) from ct1")
|
||||
tdSql.error("select stateduration(c1,'GT',1,1u) from ct1")
|
||||
tdSql.query("select stateduration(c1,'GT',1,1s) from t1")
|
||||
tdSql.checkData(10,0,63072035)
|
||||
tdSql.query("select stateduration(c1,'GT',1,1000s) from t1")
|
||||
tdSql.checkData(10,0,int(63072035/1000))
|
||||
tdSql.query("select stateduration(c1,'GT',1,1m) from t1")
|
||||
tdSql.checkData(10,0,int(63072035/60))
|
||||
tdSql.query("select stateduration(c1,'GT',1,1h) from t1")
|
||||
tdSql.checkData(10,0,int(63072035/60/60))
|
||||
tdSql.query("select stateduration(c1,'GT',1,1d) from t1")
|
||||
tdSql.checkData(10,0,int(63072035/60/24/60))
|
||||
tdSql.query("select stateduration(c1,'GT',1,1w) from t1")
|
||||
tdSql.checkData(10,0,int(63072035/60/7/24/60))
|
||||
|
||||
|
||||
def check_boundary_values(self):
|
||||
|
||||
tdSql.execute("drop database if exists bound_test")
|
||||
tdSql.execute("create database if not exists bound_test")
|
||||
tdSql.execute("use bound_test")
|
||||
tdSql.execute(
|
||||
"create table stb_bound (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(32),c9 nchar(32), c10 timestamp) tags (t1 int);"
|
||||
)
|
||||
tdSql.execute(f'create table sub1_bound using stb_bound tags ( 1 )')
|
||||
tdSql.execute(
|
||||
f"insert into sub1_bound values ( now()-1s, 2147483647, 9223372036854775807, 32767, 127, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
|
||||
)
|
||||
tdSql.execute(
|
||||
f"insert into sub1_bound values ( now(), 2147483646, 9223372036854775806, 32766, 126, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
|
||||
)
|
||||
|
||||
tdSql.execute(
|
||||
f"insert into sub1_bound values ( now(), -2147483646, -9223372036854775806, -32766, -126, -3.40E+38, -1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
|
||||
)
|
||||
|
||||
tdSql.execute(
|
||||
f"insert into sub1_bound values ( now(), 2147483643, 9223372036854775803, 32763, 123, 3.39E+38, 1.69e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
|
||||
)
|
||||
|
||||
tdSql.execute(
|
||||
f"insert into sub1_bound values ( now(), -2147483643, -9223372036854775803, -32763, -123, -3.39E+38, -1.69e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
|
||||
)
|
||||
|
||||
tdSql.error(
|
||||
f"insert into sub1_bound values ( now()+1s, 2147483648, 9223372036854775808, 32768, 128, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
|
||||
)
|
||||
|
||||
tdSql.query("select statecount(c1,'GT',1) from sub1_bound")
|
||||
tdSql.checkRows(5)
|
||||
|
||||
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
|
||||
tdSql.prepare()
|
||||
|
||||
tdLog.printNoPrefix("==========step1:create table ==============")
|
||||
|
||||
self.prepare_datas()
|
||||
|
||||
tdLog.printNoPrefix("==========step2:test errors ==============")
|
||||
|
||||
self.test_errors()
|
||||
|
||||
tdLog.printNoPrefix("==========step3:support types ============")
|
||||
|
||||
self.support_types()
|
||||
|
||||
tdLog.printNoPrefix("==========step4:support opers ============")
|
||||
self.support_opers()
|
||||
|
||||
tdLog.printNoPrefix("==========step5: statecount basic query ============")
|
||||
|
||||
self.basic_statecount_function()
|
||||
|
||||
tdLog.printNoPrefix("==========step6: statecount boundary query ============")
|
||||
|
||||
self.check_boundary_values()
|
||||
|
||||
tdLog.printNoPrefix("==========step6: statecount unit time test ============")
|
||||
|
||||
self.check_unit_time()
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -87,6 +87,8 @@ python3 ./test.py -f 2-query/sample.py
|
|||
python3 ./test.py -f 2-query/function_diff.py
|
||||
python3 ./test.py -f 2-query/unique.py
|
||||
python3 ./test.py -f 2-query/stateduration.py
|
||||
python3 ./test.py -f 2-query/function_stateduration.py
|
||||
python3 ./test.py -f 2-query/statecount.py
|
||||
|
||||
python3 ./test.py -f 7-tmq/basic5.py
|
||||
python3 ./test.py -f 7-tmq/subscribeDb.py
|
||||
|
|
Loading…
Reference in New Issue