normal table tsma and add tests
This commit is contained in:
parent
70ca830378
commit
6e7dc4b7b4
|
@ -3568,6 +3568,7 @@ typedef struct {
|
||||||
char* ast;
|
char* ast;
|
||||||
int64_t deleteMark;
|
int64_t deleteMark;
|
||||||
int64_t lastTs;
|
int64_t lastTs;
|
||||||
|
int64_t normSourceTbUid; // the Uid of source tb if its a normal table, otherwise 0
|
||||||
} SMCreateSmaReq;
|
} SMCreateSmaReq;
|
||||||
|
|
||||||
int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq);
|
int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq);
|
||||||
|
@ -4193,7 +4194,7 @@ int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp);
|
||||||
void tFreeSViewMetaRsp(SViewMetaRsp* pRsp);
|
void tFreeSViewMetaRsp(SViewMetaRsp* pRsp);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN]; // table name or tsma name
|
char name[TSDB_TABLE_FNAME_LEN]; // table name or tsma name
|
||||||
bool fetchingTsma; // if we are fetching with tsma name
|
bool fetchingWithTsmaName; // if we are fetching with tsma name
|
||||||
}STableTSMAInfoReq;
|
}STableTSMAInfoReq;
|
||||||
|
|
||||||
int32_t tSerializeTableTSMAInfoReq(void* buf, int32_t bufLen, const STableTSMAInfoReq* pReq);
|
int32_t tSerializeTableTSMAInfoReq(void* buf, int32_t bufLen, const STableTSMAInfoReq* pReq);
|
||||||
|
|
|
@ -887,6 +887,7 @@ int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq
|
||||||
}
|
}
|
||||||
if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->lastTs) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->lastTs) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, pReq->normSourceTbUid) < 0) return -1;
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -937,6 +938,7 @@ int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pR
|
||||||
}
|
}
|
||||||
if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->lastTs) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->lastTs) < 0) return -1;
|
||||||
|
if (tDecodeI64(&decoder, &pReq->normSourceTbUid) < 0) return -1;
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -9943,7 +9945,7 @@ int32_t tSerializeTableTSMAInfoReq(void* buf, int32_t bufLen, const STableTSMAIn
|
||||||
|
|
||||||
if (tStartEncode(&encoder) < 0) return -1;
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->fetchingTsma) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->fetchingWithTsmaName) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -9958,7 +9960,7 @@ int32_t tDeserializeTableTSMAInfoReq(void* buf, int32_t bufLen, STableTSMAInfoRe
|
||||||
|
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, (uint8_t*)&pReq->fetchingTsma) < 0) return -1;
|
if (tDecodeI8(&decoder, (uint8_t*)&pReq->fetchingWithTsmaName) < 0) return -1;
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
|
|
@ -1389,14 +1389,14 @@ static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) {
|
||||||
|
|
||||||
static void initSMAObj(SCreateTSMACxt* pCxt) {
|
static void initSMAObj(SCreateTSMACxt* pCxt) {
|
||||||
memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
|
memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
|
||||||
memcpy(pCxt->pSma->stb, pCxt->pSrcStb->name, TSDB_TABLE_FNAME_LEN);
|
memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN);
|
||||||
memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
|
memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
pCxt->pSma->createdTime = taosGetTimestampMs();
|
pCxt->pSma->createdTime = taosGetTimestampMs();
|
||||||
pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
|
pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
|
||||||
|
|
||||||
memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
|
memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
|
||||||
pCxt->pSma->dstTbUid = 0; // not used
|
pCxt->pSma->dstTbUid = 0; // not used
|
||||||
pCxt->pSma->stbUid = pCxt->pSrcStb->uid;
|
pCxt->pSma->stbUid = pCxt->pSrcStb ? pCxt->pSrcStb->uid : pCxt->pCreateSmaReq->normSourceTbUid;
|
||||||
pCxt->pSma->dbUid = pCxt->pDb->uid;
|
pCxt->pSma->dbUid = pCxt->pDb->uid;
|
||||||
pCxt->pSma->interval = pCxt->pCreateSmaReq->interval;
|
pCxt->pSma->interval = pCxt->pCreateSmaReq->interval;
|
||||||
pCxt->pSma->intervalUnit = pCxt->pCreateSmaReq->intervalUnit;
|
pCxt->pSma->intervalUnit = pCxt->pCreateSmaReq->intervalUnit;
|
||||||
|
@ -1442,7 +1442,7 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
|
||||||
pCxt->pCreateStreamReq->fillHistory = STREAM_FILL_HISTORY_ON;
|
pCxt->pCreateStreamReq->fillHistory = STREAM_FILL_HISTORY_ON;
|
||||||
pCxt->pCreateStreamReq->maxDelay = 10000;
|
pCxt->pCreateStreamReq->maxDelay = 10000;
|
||||||
pCxt->pCreateStreamReq->watermark = 0;
|
pCxt->pCreateStreamReq->watermark = 0;
|
||||||
pCxt->pCreateStreamReq->numOfTags = pCxt->pSrcStb->numOfTags;
|
pCxt->pCreateStreamReq->numOfTags = pCxt->pSrcStb ? pCxt->pSrcStb->numOfTags : 0;
|
||||||
pCxt->pCreateStreamReq->checkpointFreq = 0;
|
pCxt->pCreateStreamReq->checkpointFreq = 0;
|
||||||
pCxt->pCreateStreamReq->createStb = 1;
|
pCxt->pCreateStreamReq->createStb = 1;
|
||||||
pCxt->pCreateStreamReq->targetStbUid = 0;
|
pCxt->pCreateStreamReq->targetStbUid = 0;
|
||||||
|
@ -1454,15 +1454,17 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
|
||||||
pCxt->pCreateStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql);
|
pCxt->pCreateStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql);
|
||||||
|
|
||||||
// construct tags
|
// construct tags
|
||||||
pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pSrcStb->numOfTags, sizeof(SField));
|
if (pCxt->pSrcStb) {
|
||||||
for (int32_t idx = 0; idx < pCxt->pSrcStb->numOfTags; ++idx) {
|
pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pSrcStb->numOfTags, sizeof(SField));
|
||||||
SField f = {0};
|
for (int32_t idx = 0; idx < pCxt->pSrcStb->numOfTags; ++idx) {
|
||||||
SSchema *pSchema = &pCxt->pSrcStb->pTags[idx];
|
SField f = {0};
|
||||||
f.bytes = pSchema->bytes;
|
SSchema *pSchema = &pCxt->pSrcStb->pTags[idx];
|
||||||
f.type = pSchema->type;
|
f.bytes = pSchema->bytes;
|
||||||
f.flags = pSchema->flags;
|
f.type = pSchema->type;
|
||||||
tstrncpy(f.name, pSchema->name, TSDB_COL_NAME_LEN);
|
f.flags = pSchema->flags;
|
||||||
taosArrayPush(pCxt->pCreateStreamReq->pTags, &f);
|
tstrncpy(f.name, pSchema->name, TSDB_COL_NAME_LEN);
|
||||||
|
taosArrayPush(pCxt->pCreateStreamReq->pTags, &f);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1605,11 +1607,13 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
|
||||||
// TODO handle normal table
|
// TODO handle normal table
|
||||||
pStb = mndAcquireStb(pMnode, createReq.stb);
|
if (createReq.normSourceTbUid == 0) {
|
||||||
if (!pStb) {
|
pStb = mndAcquireStb(pMnode, createReq.stb);
|
||||||
mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
|
if (!pStb) {
|
||||||
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
|
mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
|
||||||
goto _OVER;
|
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
|
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
|
@ -1672,7 +1676,7 @@ _OVER:
|
||||||
mError("tsma:%s, failed to create since %s", createReq.name, terrstr());
|
mError("tsma:%s, failed to create since %s", createReq.name, terrstr());
|
||||||
}
|
}
|
||||||
|
|
||||||
mndReleaseStb(pMnode, pStb);
|
if (pStb) mndReleaseStb(pMnode, pStb);
|
||||||
mndReleaseSma(pMnode, pSma);
|
mndReleaseSma(pMnode, pSma);
|
||||||
mndReleaseStream(pMnode, pStream);
|
mndReleaseStream(pMnode, pStream);
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
@ -2032,12 +2036,15 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp
|
||||||
SSdb * pSdb = pMnode->pSdb;
|
SSdb * pSdb = pMnode->pSdb;
|
||||||
void * pIter = NULL;
|
void * pIter = NULL;
|
||||||
|
|
||||||
|
SStbObj *pStb = NULL;
|
||||||
|
/*
|
||||||
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
|
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
|
||||||
if (NULL == pStb) {
|
if (NULL == pStb) {
|
||||||
*exist = false;
|
*exist = false;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
mndReleaseStb(pMnode, pStb);
|
mndReleaseStb(pMnode, pStb);
|
||||||
|
*/
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||||
|
@ -2093,7 +2100,7 @@ static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsmaReq.fetchingTsma) {
|
if (tsmaReq.fetchingWithTsmaName) {
|
||||||
code = mndGetTSMA(pMnode, tsmaReq.name, &rsp, &exist);
|
code = mndGetTSMA(pMnode, tsmaReq.name, &rsp, &exist);
|
||||||
} else {
|
} else {
|
||||||
code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist);
|
code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist);
|
||||||
|
|
|
@ -10641,6 +10641,9 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
|
||||||
numOfTags = pTableMeta->tableInfo.numOfTags;
|
numOfTags = pTableMeta->tableInfo.numOfTags;
|
||||||
pCols = pTableMeta->schema;
|
pCols = pTableMeta->schema;
|
||||||
pTags = pTableMeta->schema + numOfCols;
|
pTags = pTableMeta->schema + numOfCols;
|
||||||
|
if (pTableMeta->tableType == TSDB_NORMAL_TABLE) {
|
||||||
|
pReq->normSourceTbUid = pTableMeta->uid;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6570,8 +6570,6 @@ static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan
|
||||||
SLogicSubplan* pSubplan = tsmaOptCtx.generatedSubPlans[i];
|
SLogicSubplan* pSubplan = tsmaOptCtx.generatedSubPlans[i];
|
||||||
if (!pSubplan) continue;
|
if (!pSubplan) continue;
|
||||||
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
|
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
|
||||||
pSubplan->id.groupId = pLogicSubplan->id.groupId + 1;
|
|
||||||
pSubplan->id.queryId = pLogicSubplan->id.queryId;
|
|
||||||
nodesListMakeAppend(&pLogicSubplan->pChildren, (SNode*)pSubplan);
|
nodesListMakeAppend(&pLogicSubplan->pChildren, (SNode*)pSubplan);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -323,7 +323,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
|
||||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||||
return (!stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) ||
|
return (!stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) ||
|
||||||
isPartTableAgg((SAggLogicNode*)pNode)) &&
|
isPartTableAgg((SAggLogicNode*)pNode)) &&
|
||||||
stbSplHasMultiTbScan(streamQuery, pNode) && !stbSplIsTableCountQuery(pNode);
|
(stbSplHasMultiTbScan(streamQuery, pNode) && !stbSplIsTableCountQuery(pNode));
|
||||||
case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
||||||
return stbSplNeedSplitWindow(streamQuery, pNode);
|
return stbSplNeedSplitWindow(streamQuery, pNode);
|
||||||
case QUERY_NODE_LOGIC_PLAN_SORT:
|
case QUERY_NODE_LOGIC_PLAN_SORT:
|
||||||
|
|
|
@ -329,7 +329,7 @@ int32_t queryBuildGetTSMAMsg(void *input, char **msg, int32_t msgSize, int32_t *
|
||||||
}
|
}
|
||||||
|
|
||||||
STableTSMAInfoReq req = {0};
|
STableTSMAInfoReq req = {0};
|
||||||
req.fetchingTsma = true;
|
req.fetchingWithTsmaName = true;
|
||||||
strncpy(req.name, input, sizeof(req.name) - 1);
|
strncpy(req.name, input, sizeof(req.name) - 1);
|
||||||
|
|
||||||
int32_t bufLen = tSerializeTableTSMAInfoReq(NULL, 0, &req);
|
int32_t bufLen = tSerializeTableTSMAInfoReq(NULL, 0, &req);
|
||||||
|
|
|
@ -81,15 +81,15 @@ class TSMAQueryContext:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
class TSMAQueryContextBuilder:
|
class TSMAQCBuilder:
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self.ctx: TSMAQueryContext = TSMAQueryContext()
|
self.qc_: TSMAQueryContext = TSMAQueryContext()
|
||||||
|
|
||||||
def get_ctx(self) -> TSMAQueryContext:
|
def get_qc(self) -> TSMAQueryContext:
|
||||||
return self.ctx
|
return self.qc_
|
||||||
|
|
||||||
def with_sql(self, sql: str):
|
def with_sql(self, sql: str):
|
||||||
self.ctx.sql = sql
|
self.qc_.sql = sql
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def to_timestamp(self, ts: str) -> float:
|
def to_timestamp(self, ts: str) -> float:
|
||||||
|
@ -99,25 +99,25 @@ class TSMAQueryContextBuilder:
|
||||||
res = tdSql.queryResult[0][0]
|
res = tdSql.queryResult[0][0]
|
||||||
return res.timestamp() * 1000
|
return res.timestamp() * 1000
|
||||||
|
|
||||||
def should_query_with_table(self, tb_name: str, ts_begin: str, ts_end: str) -> 'TSMAQueryContextBuilder':
|
def should_query_with_table(self, tb_name: str, ts_begin: str, ts_end: str) -> 'TSMAQCBuilder':
|
||||||
used_tsma: UsedTsma = UsedTsma()
|
used_tsma: UsedTsma = UsedTsma()
|
||||||
used_tsma.name = tb_name
|
used_tsma.name = tb_name
|
||||||
used_tsma.time_range_start = self.to_timestamp(ts_begin)
|
used_tsma.time_range_start = self.to_timestamp(ts_begin)
|
||||||
used_tsma.time_range_end = self.to_timestamp(ts_end)
|
used_tsma.time_range_end = self.to_timestamp(ts_end)
|
||||||
used_tsma.is_tsma_ = False
|
used_tsma.is_tsma_ = False
|
||||||
self.ctx.used_tsmas.append(used_tsma)
|
self.qc_.used_tsmas.append(used_tsma)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def should_query_with_tsma(self, tsma_name: str, ts_begin: str, ts_end: str) -> 'TSMAQueryContextBuilder':
|
def should_query_with_tsma(self, tsma_name: str, ts_begin: str, ts_end: str) -> 'TSMAQCBuilder':
|
||||||
used_tsma: UsedTsma = UsedTsma()
|
used_tsma: UsedTsma = UsedTsma()
|
||||||
used_tsma.name = tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX
|
used_tsma.name = tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX
|
||||||
used_tsma.time_range_start = self.to_timestamp(ts_begin)
|
used_tsma.time_range_start = self.to_timestamp(ts_begin)
|
||||||
used_tsma.time_range_end = self.to_timestamp(ts_end)
|
used_tsma.time_range_end = self.to_timestamp(ts_end)
|
||||||
used_tsma.is_tsma_ = True
|
used_tsma.is_tsma_ = True
|
||||||
self.ctx.used_tsmas.append(used_tsma)
|
self.qc_.used_tsmas.append(used_tsma)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
class TSMATestContext:
|
class TSMATester:
|
||||||
def __init__(self, tdSql: TDSql) -> None:
|
def __init__(self, tdSql: TDSql) -> None:
|
||||||
self.tsmas = []
|
self.tsmas = []
|
||||||
self.tdSql: TDSql = tdSql
|
self.tdSql: TDSql = tdSql
|
||||||
|
@ -182,7 +182,9 @@ class TSMATestContext:
|
||||||
|
|
||||||
if no_tsma_res is None or tsma_res is None:
|
if no_tsma_res is None or tsma_res is None:
|
||||||
if no_tsma_res != tsma_res:
|
if no_tsma_res != tsma_res:
|
||||||
tdLog.exit("comparing tsma res for: %s got different rows of result: with tsma: %s, with tsma: %s" % (sql, no_tsma_res, tsma_res))
|
tdLog.exit("comparing tsma res for: %s got different rows of result: with tsma: %s, with tsma: %s" % (sql, str(no_tsma_res), str(tsma_res)))
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
if len(no_tsma_res) != len(tsma_res):
|
if len(no_tsma_res) != len(tsma_res):
|
||||||
tdLog.exit("comparing tsma res for: %s got differnt rows of result: without tsma: %d, with tsma: %d" % (sql, len(no_tsma_res), len(tsma_res)))
|
tdLog.exit("comparing tsma res for: %s got differnt rows of result: without tsma: %d, with tsma: %d" % (sql, len(no_tsma_res), len(tsma_res)))
|
||||||
|
@ -200,6 +202,101 @@ class TSMATestContext:
|
||||||
for sql, query_ctx in zip(sqls, expects):
|
for sql, query_ctx in zip(sqls, expects):
|
||||||
self.check_sql(sql, query_ctx)
|
self.check_sql(sql, query_ctx)
|
||||||
|
|
||||||
|
class TSMATesterSQLGeneratorOptions:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
class TSMATestSQLGenerator:
|
||||||
|
def __init__(self, opts: TSMATesterSQLGeneratorOptions):
|
||||||
|
self.db_name_: str = ''
|
||||||
|
self.tb_name_: str = ''
|
||||||
|
self.ts_scan_range_: List[float] = [UsedTsma.TS_MIN, UsedTsma.TS_MAX]
|
||||||
|
self.agg_funcs_: List[str] = []
|
||||||
|
self.tsmas_: List[TSMA] = [] ## currently created tsmas
|
||||||
|
self.opts_: TSMATesterSQLGeneratorOptions = opts
|
||||||
|
|
||||||
|
self.select_list_: List[str] = []
|
||||||
|
self.where_list_: List[str] = []
|
||||||
|
self.group_or_partition_by_list: List[str] = []
|
||||||
|
self.interval: str = ''
|
||||||
|
|
||||||
|
def get_random_type(self, funcs):
|
||||||
|
rand: int = randrange(1, len(funcs))
|
||||||
|
return funcs[rand-1]()
|
||||||
|
|
||||||
|
def generate_one(self) -> str:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def generate_timestamp(self, left: float = -1) -> str:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _generate_between(self):
|
||||||
|
def generate(generator: TSMATestSQLGenerator):
|
||||||
|
left = generator.generate_timestamp()
|
||||||
|
return "BTEWEEN %s and %s" % (left, generator.generate_timestamp(left))
|
||||||
|
return self.get_random_type([lambda: '', generate])
|
||||||
|
|
||||||
|
def _generate_scan_range_operators(self):
|
||||||
|
left = self._generate_scan_range_left()
|
||||||
|
right = self._generate_scan_range_right(float(left.split(' ')[-1]))
|
||||||
|
if len(left) == 0 and len(right) == 0:
|
||||||
|
return ''
|
||||||
|
sql = ' ts '
|
||||||
|
if len(left) > 0:
|
||||||
|
sql += '%s ' % (left)
|
||||||
|
|
||||||
|
if len(right) > 0:
|
||||||
|
if len(sql) > 0:
|
||||||
|
sql += 'and ts '
|
||||||
|
sql += '%s ' % (right)
|
||||||
|
return sql
|
||||||
|
|
||||||
|
def _generate_scan_range_left(self) -> str:
|
||||||
|
def a(g: TSMATestSQLGenerator):
|
||||||
|
return '>= %s' % (g.generate_timestamp())
|
||||||
|
def b(g: TSMATestSQLGenerator):
|
||||||
|
return '> %s' % (g.generate_timestamp())
|
||||||
|
return self.get_random_type([lambda: '', a, b])
|
||||||
|
|
||||||
|
def _generate_scan_range_right(self, left: float) -> str:
|
||||||
|
def a(g:TSMATestSQLGenerator):
|
||||||
|
return '< %s' % (self.generate_timestamp(left))
|
||||||
|
def b(g:TSMATestSQLGenerator):
|
||||||
|
return '<= %s' % (self.generate_timestamp(left))
|
||||||
|
return self._generate_scan_range([lambda: '', a, b])
|
||||||
|
|
||||||
|
## generate ts scan ranges
|
||||||
|
def _generate_scan_range(self) -> str:
|
||||||
|
empty = lambda: ''
|
||||||
|
def a(g:TSMATestSQLGenerator):
|
||||||
|
return g._generate_between()
|
||||||
|
def b(g:TSMATestSQLGenerator):
|
||||||
|
return g._generate_scan_range_operators()
|
||||||
|
def ts_range(g:TSMATestSQLGenerator):
|
||||||
|
return g.get_random_type([a,b])
|
||||||
|
return self.get_random_type([empty, ts_range])
|
||||||
|
|
||||||
|
def _generate_where_conditions(self) -> str:
|
||||||
|
pass
|
||||||
|
|
||||||
|
## generate func in tsmas(select list)
|
||||||
|
def _generate_agg_func_for_select(self) -> str:
|
||||||
|
pass
|
||||||
|
|
||||||
|
## generate group by tbname, or exprs containing tbnames
|
||||||
|
def _generate_tbname_for_group_partition_by(self) -> str:
|
||||||
|
pass
|
||||||
|
|
||||||
|
## generate group by tags, or exprs containing tags
|
||||||
|
def _generate_tag_for_group_partition_by(self) -> str:
|
||||||
|
pass
|
||||||
|
|
||||||
|
## interval, sliding, offset
|
||||||
|
def _generate_interval(self) -> str:
|
||||||
|
pass
|
||||||
|
|
||||||
|
## order by, limit, having, subquery...
|
||||||
|
|
||||||
class TDTestCase:
|
class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 4
|
self.vgroups = 4
|
||||||
|
@ -211,7 +308,7 @@ class TDTestCase:
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
tdLog.debug(f"start to excute {__file__}")
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
tdSql.init(conn.cursor(), False)
|
tdSql.init(conn.cursor(), False)
|
||||||
self.test_ctx: TSMATestContext = TSMATestContext(tdSql)
|
self.tsma_tester: TSMATester = TSMATester(tdSql)
|
||||||
|
|
||||||
def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'):
|
def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'):
|
||||||
if dropFlag == 1:
|
if dropFlag == 1:
|
||||||
|
@ -237,6 +334,14 @@ class TDTestCase:
|
||||||
|
|
||||||
tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
|
tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def init_normal_tb(self, tsql, db_name: str, tb_name: str, rows: int, start_ts: int, ts_step: int):
|
||||||
|
sql = 'CREATE TABLE %s.%s (ts timestamp, c1 INT, c2 INT, c3 VARCHAR(255), c4 INT)' % (db_name, tb_name)
|
||||||
|
tsql.execute(sql)
|
||||||
|
sql = 'INSERT INTO %s.%s values' % (db_name, tb_name)
|
||||||
|
for j in range(rows):
|
||||||
|
sql += '(%d, %d,%d,"varchar_%d",%d),' % (start_ts + j * ts_step + randrange(500), j % 10 + randrange(100), j % 10 + randrange(200), j % 10, j % 10)
|
||||||
|
tsql.execute(sql)
|
||||||
|
|
||||||
def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep):
|
def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep):
|
||||||
tdLog.debug("start to insert data ............")
|
tdLog.debug("start to insert data ............")
|
||||||
|
@ -301,7 +406,8 @@ class TDTestCase:
|
||||||
ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],\
|
ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],\
|
||||||
rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\
|
rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\
|
||||||
startTs=paraDict["startTs"],tsStep=paraDict["tsStep"])
|
startTs=paraDict["startTs"],tsStep=paraDict["tsStep"])
|
||||||
return
|
self.init_normal_tb(tdSql, paraDict['dbName'], 'norm_tb', paraDict['rowsPerTbl'], paraDict['startTs'], paraDict['tsStep'])
|
||||||
|
|
||||||
|
|
||||||
def create_tsma(self, tsma_name: str, db: str, tb: str, func_list: list, interval: str):
|
def create_tsma(self, tsma_name: str, db: str, tb: str, func_list: list, interval: str):
|
||||||
tdSql.execute('use %s' % db)
|
tdSql.execute('use %s' % db)
|
||||||
|
@ -329,7 +435,7 @@ class TDTestCase:
|
||||||
|
|
||||||
def check(self, func):
|
def check(self, func):
|
||||||
for ctx in func():
|
for ctx in func():
|
||||||
self.test_ctx.check_sql(ctx.sql, ctx)
|
self.tsma_tester.check_sql(ctx.sql, ctx)
|
||||||
|
|
||||||
def test_query_with_tsma(self):
|
def test_query_with_tsma(self):
|
||||||
self.init_data()
|
self.init_data()
|
||||||
|
@ -337,9 +443,10 @@ class TDTestCase:
|
||||||
self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m')
|
self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m')
|
||||||
self.create_recursive_tsma('tsma1', 'tsma3', 'test', '20m')
|
self.create_recursive_tsma('tsma1', 'tsma3', 'test', '20m')
|
||||||
self.create_recursive_tsma('tsma2', 'tsma4', 'test', '1h')
|
self.create_recursive_tsma('tsma2', 'tsma4', 'test', '1h')
|
||||||
|
self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m')
|
||||||
## why need 5s, calculation not finished yet.
|
## why need 5s, calculation not finished yet.
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
#time.sleep(9999999)
|
time.sleep(9999999)
|
||||||
self.test_query_with_tsma_interval()
|
self.test_query_with_tsma_interval()
|
||||||
self.test_query_with_tsma_agg()
|
self.test_query_with_tsma_agg()
|
||||||
|
|
||||||
|
@ -353,24 +460,24 @@ class TDTestCase:
|
||||||
def test_query_with_tsma_interval_no_partition(self) -> List[TSMAQueryContext]:
|
def test_query_with_tsma_interval_no_partition(self) -> List[TSMAQueryContext]:
|
||||||
ctxs: List[TSMAQueryContext] = []
|
ctxs: List[TSMAQueryContext] = []
|
||||||
sql = 'select avg(c1), avg(c2) from meters interval(5m)'
|
sql = 'select avg(c1), avg(c2) from meters interval(5m)'
|
||||||
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
|
ctxs.append(TSMAQCBuilder().with_sql(sql) \
|
||||||
.should_query_with_tsma('tsma1', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
|
.should_query_with_tsma('tsma1', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc())
|
||||||
|
|
||||||
sql = 'select avg(c1), avg(c2) from meters interval(10m)'
|
sql = 'select avg(c1), avg(c2) from meters interval(10m)'
|
||||||
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
|
ctxs.append(TSMAQCBuilder().with_sql(sql) \
|
||||||
.should_query_with_tsma('tsma1', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
|
.should_query_with_tsma('tsma1', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc())
|
||||||
sql = 'select avg(c1), avg(c2) from meters interval(30m)'
|
sql = 'select avg(c1), avg(c2) from meters interval(30m)'
|
||||||
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
|
ctxs.append(TSMAQCBuilder().with_sql(sql) \
|
||||||
.should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
|
.should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc())
|
||||||
sql = 'select avg(c1), avg(c2) from meters interval(60m)'
|
sql = 'select avg(c1), avg(c2) from meters interval(60m)'
|
||||||
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
|
ctxs.append(TSMAQCBuilder().with_sql(sql) \
|
||||||
.should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
|
.should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc())
|
||||||
|
|
||||||
sql = "select avg(c1), avg(c2) from meters where ts >= '2018-09-17 09:00:00.009' and ts < '2018-09-17 10:23:19.665' interval(30m)"
|
sql = "select avg(c1), avg(c2) from meters where ts >= '2018-09-17 09:00:00.009' and ts < '2018-09-17 10:23:19.665' interval(30m)"
|
||||||
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
|
ctxs.append(TSMAQCBuilder().with_sql(sql) \
|
||||||
.should_query_with_table('meters', '2018-09-17 09:00:00.009','2018-09-17 09:29:59.999') \
|
.should_query_with_table('meters', '2018-09-17 09:00:00.009','2018-09-17 09:29:59.999') \
|
||||||
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \
|
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \
|
||||||
.should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.664').get_ctx())
|
.should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.664').get_qc())
|
||||||
return ctxs
|
return ctxs
|
||||||
|
|
||||||
def test_query_with_tsma_interval_partition_by_tbname(self):
|
def test_query_with_tsma_interval_partition_by_tbname(self):
|
||||||
|
@ -394,42 +501,42 @@ class TDTestCase:
|
||||||
def test_query_with_tsma_agg_no_group_by(self):
|
def test_query_with_tsma_agg_no_group_by(self):
|
||||||
ctxs: List[TSMAQueryContext] = []
|
ctxs: List[TSMAQueryContext] = []
|
||||||
sql = 'select avg(c1), avg(c2) from meters'
|
sql = 'select avg(c1), avg(c2) from meters'
|
||||||
ctxs.append(TSMAQueryContextBuilder().with_sql(sql).should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
|
ctxs.append(TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc())
|
||||||
|
|
||||||
sql = 'select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.000" and "2018-09-17 10:00:00.000"'
|
sql = 'select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.000" and "2018-09-17 10:00:00.000"'
|
||||||
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
|
ctxs.append(TSMAQCBuilder().with_sql(sql) \
|
||||||
.should_query_with_tsma('tsma2', '2018-09-17 09:00:00','2018-09-17 09:59:59:999') \
|
.should_query_with_tsma('tsma2', '2018-09-17 09:00:00','2018-09-17 09:59:59:999') \
|
||||||
.should_query_with_table("meters", '2018-09-17 10:00:00','2018-09-17 10:00:00').get_ctx())
|
.should_query_with_table("meters", '2018-09-17 10:00:00','2018-09-17 10:00:00').get_qc())
|
||||||
|
|
||||||
sql = 'select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
|
sql = 'select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
|
||||||
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
|
ctxs.append(TSMAQCBuilder().with_sql(sql) \
|
||||||
.should_query_with_table('meters', '2018-09-17 09:00:00.200','2018-09-17 09:29:59:999') \
|
.should_query_with_table('meters', '2018-09-17 09:00:00.200','2018-09-17 09:29:59:999') \
|
||||||
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \
|
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \
|
||||||
.should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.800').get_ctx())
|
.should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.800').get_qc())
|
||||||
|
|
||||||
sql = 'select avg(c1) + avg(c2), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
|
sql = 'select avg(c1) + avg(c2), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
|
||||||
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
|
ctxs.append(TSMAQCBuilder().with_sql(sql) \
|
||||||
.should_query_with_table('meters', '2018-09-17 09:00:00.200','2018-09-17 09:29:59:999') \
|
.should_query_with_table('meters', '2018-09-17 09:00:00.200','2018-09-17 09:29:59:999') \
|
||||||
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \
|
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \
|
||||||
.should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.800').get_ctx())
|
.should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.800').get_qc())
|
||||||
|
|
||||||
sql = 'select avg(c1) + avg(c2), avg(c2) + 1 from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
|
sql = 'select avg(c1) + avg(c2), avg(c2) + 1 from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
|
||||||
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
|
ctxs.append(TSMAQCBuilder().with_sql(sql) \
|
||||||
.should_query_with_table('meters', '2018-09-17 09:00:00.200','2018-09-17 09:29:59:999') \
|
.should_query_with_table('meters', '2018-09-17 09:00:00.200','2018-09-17 09:29:59:999') \
|
||||||
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \
|
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \
|
||||||
.should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.800').get_ctx())
|
.should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.800').get_qc())
|
||||||
|
|
||||||
sql = 'select avg(c1) + avg(c2) from meters where tbname like "%t1%"'
|
sql = 'select avg(c1) + avg(c2) from meters where tbname like "%t1%"'
|
||||||
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
|
ctxs.append(TSMAQCBuilder().with_sql(sql) \
|
||||||
.should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
|
.should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc())
|
||||||
|
|
||||||
sql = 'select avg(c1), avg(c2) from meters where c1 is not NULL'
|
sql = 'select avg(c1), avg(c2) from meters where c1 is not NULL'
|
||||||
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
|
ctxs.append(TSMAQCBuilder().with_sql(sql) \
|
||||||
.should_query_with_table('meters', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
|
.should_query_with_table('meters', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc())
|
||||||
|
|
||||||
sql = 'select avg(c1), avg(c2), spread(c4) from meters'
|
sql = 'select avg(c1), avg(c2), spread(c4) from meters'
|
||||||
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
|
ctxs.append(TSMAQCBuilder().with_sql(sql) \
|
||||||
.should_query_with_table('meters', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
|
.should_query_with_table('meters', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc())
|
||||||
|
|
||||||
return ctxs
|
return ctxs
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue