Merge pull request #26492 from taosdata/enh/3.0/bigger_tsma_interval
Enh/3.0/bigger tsma interval
This commit is contained in:
commit
076fd07a26
|
@ -127,6 +127,9 @@ int32_t TEST_char2ts(const char* format, int64_t* ts, int32_t precision, const c
|
||||||
/// @return 0 success, other fail
|
/// @return 0 success, other fail
|
||||||
int32_t offsetOfTimezone(char* tzStr, int64_t* offset);
|
int32_t offsetOfTimezone(char* tzStr, int64_t* offset);
|
||||||
|
|
||||||
|
bool checkRecursiveTsmaInterval(int64_t baseInterval, int8_t baseUnit, int64_t interval, int8_t unit, int8_t precision,
|
||||||
|
bool checkEq);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -640,6 +640,7 @@ typedef struct SCreateTSMAStmt {
|
||||||
STSMAOptions* pOptions;
|
STSMAOptions* pOptions;
|
||||||
SNode* pPrevQuery;
|
SNode* pPrevQuery;
|
||||||
SMCreateSmaReq* pReq;
|
SMCreateSmaReq* pReq;
|
||||||
|
uint8_t precision;
|
||||||
} SCreateTSMAStmt;
|
} SCreateTSMAStmt;
|
||||||
|
|
||||||
typedef struct SDropTSMAStmt {
|
typedef struct SDropTSMAStmt {
|
||||||
|
|
|
@ -1969,3 +1969,98 @@ int32_t TEST_char2ts(const char* format, int64_t* ts, int32_t precision, const c
|
||||||
taosArrayDestroy(formats);
|
taosArrayDestroy(formats);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int8_t UNIT_INDEX[26] = {/*a*/ 2, 0, -1, 6, -1, -1, -1,
|
||||||
|
/*h*/ 5, -1, -1, -1, -1, 4, 8,
|
||||||
|
/*o*/ -1, -1, -1, -1, 3, -1,
|
||||||
|
/*u*/ 1, -1, 7, -1, 9, -1};
|
||||||
|
|
||||||
|
#define GET_UNIT_INDEX(idx) UNIT_INDEX[(idx) - 97]
|
||||||
|
|
||||||
|
static int64_t UNIT_MATRIX[10][11] = { /* ns, us, ms, s, min, h, d, w, month, y*/
|
||||||
|
/*ns*/ { 1, 1000, 0},
|
||||||
|
/*us*/ {1000, 1, 1000, 0},
|
||||||
|
/*ms*/ { 0, 1000, 1, 1000, 0},
|
||||||
|
/*s*/ { 0, 0, 1000, 1, 60, 0},
|
||||||
|
/*min*/ { 0, 0, 0, 60, 1, 60, 0},
|
||||||
|
/*h*/ { 0, 0, 0, 0, 60, 1, 1, 0},
|
||||||
|
/*d*/ { 0, 0, 0, 0, 0, 24, 1, 7, 1, 0},
|
||||||
|
/*w*/ { 0, 0, 0, 0, 0, 0, 7, 1, -1, 0},
|
||||||
|
/*mon*/ { 0, 0, 0, 0, 0, 0, 0, 0, 1, 12, 0},
|
||||||
|
/*y*/ { 0, 0, 0, 0, 0, 0, 0, 0, 12, 1, 0}};
|
||||||
|
|
||||||
|
static bool recursiveTsmaCheckRecursive(int64_t baseInterval, int8_t baseIdx, int64_t interval, int8_t idx, bool checkEq) {
|
||||||
|
if (UNIT_MATRIX[baseIdx][idx] == -1) return false;
|
||||||
|
if (baseIdx == idx) {
|
||||||
|
if (interval < baseInterval) return false;
|
||||||
|
if (checkEq && interval == baseInterval) return false;
|
||||||
|
return interval % baseInterval == 0;
|
||||||
|
}
|
||||||
|
int8_t next = baseIdx + 1;
|
||||||
|
int64_t val = UNIT_MATRIX[baseIdx][next];
|
||||||
|
while (val != 0 && next <= idx) {
|
||||||
|
if (val == -1) {
|
||||||
|
next++;
|
||||||
|
val = UNIT_MATRIX[baseIdx][next];
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (val % baseInterval == 0 || baseInterval % val == 0) {
|
||||||
|
int8_t extra = baseInterval >= val ? 0 : 1;
|
||||||
|
bool needCheckEq = baseInterval >= val && !(baseIdx < next && val == 1);
|
||||||
|
if (!recursiveTsmaCheckRecursive(baseInterval / val + extra, next, interval, idx, needCheckEq && checkEq)) {
|
||||||
|
next++;
|
||||||
|
val = UNIT_MATRIX[baseIdx][next];
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool recursiveTsmaCheckRecursiveReverse(int64_t baseInterval, int8_t baseIdx, int64_t interval, int8_t idx, bool checkEq) {
|
||||||
|
if (UNIT_MATRIX[baseIdx][idx] == -1) return false;
|
||||||
|
|
||||||
|
if (baseIdx == idx) {
|
||||||
|
if (interval < baseInterval) return false;
|
||||||
|
if (checkEq && interval == baseInterval) return false;
|
||||||
|
return interval % baseInterval == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int8_t next = baseIdx - 1;
|
||||||
|
int64_t val = UNIT_MATRIX[baseIdx][next];
|
||||||
|
while (val != 0 && next >= 0) {
|
||||||
|
return recursiveTsmaCheckRecursiveReverse(baseInterval * val, next, interval, idx, checkEq);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @breif check if tsma with param [interval], [unit] can create based on base tsma with baseInterval and baseUnit
|
||||||
|
* @param baseInterval, baseUnit, interval/unit of base tsma
|
||||||
|
* @param interval the tsma interval going to create. Not that if unit is not calander unit, then interval has already been
|
||||||
|
* translated to TICKS of [precision]
|
||||||
|
* @param unit the tsma unit going to create
|
||||||
|
* @param precision the precision of this db
|
||||||
|
* @param checkEq pass true if same interval is not acceptable, false if acceptable.
|
||||||
|
* @ret true the tsma can be created, else cannot
|
||||||
|
* */
|
||||||
|
bool checkRecursiveTsmaInterval(int64_t baseInterval, int8_t baseUnit, int64_t interval, int8_t unit, int8_t precision, bool checkEq) {
|
||||||
|
bool baseIsCalendarDuration = IS_CALENDAR_TIME_DURATION(baseUnit);
|
||||||
|
if (!baseIsCalendarDuration) baseInterval = convertTimeFromPrecisionToUnit(baseInterval, precision, baseUnit);
|
||||||
|
bool isCalendarDuration = IS_CALENDAR_TIME_DURATION(unit);
|
||||||
|
if (!isCalendarDuration) interval = convertTimeFromPrecisionToUnit(interval, precision, unit);
|
||||||
|
|
||||||
|
bool needCheckEq = baseIsCalendarDuration == isCalendarDuration && checkEq;
|
||||||
|
|
||||||
|
int8_t baseIdx = GET_UNIT_INDEX(baseUnit), idx = GET_UNIT_INDEX(unit);
|
||||||
|
if (baseIdx <= idx) {
|
||||||
|
return recursiveTsmaCheckRecursive(baseInterval, baseIdx, interval, idx, needCheckEq);
|
||||||
|
} else {
|
||||||
|
return recursiveTsmaCheckRecursiveReverse(baseInterval, baseIdx, interval, idx, checkEq);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
|
@ -2004,8 +2004,13 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
|
||||||
|
|
||||||
// interval
|
// interval
|
||||||
char interval[64 + VARSTR_HEADER_SIZE] = {0};
|
char interval[64 + VARSTR_HEADER_SIZE] = {0};
|
||||||
int32_t len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
|
int32_t len = 0;
|
||||||
getPrecisionUnit(pSrcDb->cfg.precision));
|
if (!IS_CALENDAR_TIME_DURATION(pSma->intervalUnit)) {
|
||||||
|
len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
|
||||||
|
getPrecisionUnit(pSrcDb->cfg.precision));
|
||||||
|
} else {
|
||||||
|
len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, pSma->intervalUnit);
|
||||||
|
}
|
||||||
varDataSetLen(interval, len);
|
varDataSetLen(interval, len);
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, interval, false);
|
colDataSetVal(pColInfo, numOfRows, interval, false);
|
||||||
|
|
|
@ -2187,7 +2187,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
||||||
|
|
||||||
for (int i = 0; i < numOfTables; i++) {
|
for (int i = 0; i < numOfTables; i++) {
|
||||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
info->groupId = info->uid;
|
info->groupId = groupByTbname ? info->uid : 0;
|
||||||
|
|
||||||
taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
|
taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
|
||||||
sizeof(info->uid));
|
sizeof(info->uid));
|
||||||
|
|
|
@ -883,6 +883,8 @@ static uint8_t getPrecisionFromCurrStmt(SNode* pCurrStmt, uint8_t defaultVal) {
|
||||||
if (isDeleteStmt(pCurrStmt)) {
|
if (isDeleteStmt(pCurrStmt)) {
|
||||||
return ((SDeleteStmt*)pCurrStmt)->precision;
|
return ((SDeleteStmt*)pCurrStmt)->precision;
|
||||||
}
|
}
|
||||||
|
if (pCurrStmt && nodeType(pCurrStmt) == QUERY_NODE_CREATE_TSMA_STMT)
|
||||||
|
return ((SCreateTSMAStmt*)pCurrStmt)->precision;
|
||||||
return defaultVal;
|
return defaultVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11135,22 +11137,41 @@ static int32_t rewriteTSMAFuncs(STranslateContext* pCxt, SCreateTSMAStmt* pStmt,
|
||||||
|
|
||||||
static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq,
|
static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq,
|
||||||
SName* useTbName) {
|
SName* useTbName) {
|
||||||
SName name;
|
SName name;
|
||||||
|
SDbCfgInfo pDbInfo = {0};
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), pReq->name);
|
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), pReq->name);
|
||||||
memset(&name, 0, sizeof(SName));
|
memset(&name, 0, sizeof(SName));
|
||||||
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, useTbName);
|
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, useTbName);
|
||||||
tNameExtractFullName(useTbName, pReq->stb);
|
tNameExtractFullName(useTbName, pReq->stb);
|
||||||
pReq->igExists = pStmt->ignoreExists;
|
pReq->igExists = pStmt->ignoreExists;
|
||||||
|
|
||||||
|
code = getDBCfg(pCxt, pStmt->dbName, &pDbInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
pStmt->precision = pDbInfo.precision;
|
||||||
|
code = translateValue(pCxt, (SValueNode*)pStmt->pOptions->pInterval);
|
||||||
|
if (code == DEAL_RES_ERROR) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
|
pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
|
||||||
pReq->intervalUnit = TIME_UNIT_MILLISECOND;
|
pReq->intervalUnit = ((SValueNode*)pStmt->pOptions->pInterval)->unit;
|
||||||
|
|
||||||
#define TSMA_MIN_INTERVAL_MS 1000 * 60 // 1m
|
#define TSMA_MIN_INTERVAL_MS 1000 * 60 // 1m
|
||||||
#define TSMA_MAX_INTERVAL_MS (60 * 60 * 1000) // 1h
|
#define TSMA_MAX_INTERVAL_MS (60UL * 60UL * 1000UL * 24UL * 365UL) // 1y
|
||||||
if (pReq->interval > TSMA_MAX_INTERVAL_MS || pReq->interval < TSMA_MIN_INTERVAL_MS) {
|
|
||||||
return TSDB_CODE_TSMA_INVALID_INTERVAL;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
if (!IS_CALENDAR_TIME_DURATION(pReq->intervalUnit)) {
|
||||||
|
int64_t factor = TSDB_TICK_PER_SECOND(pDbInfo.precision) / TSDB_TICK_PER_SECOND(TSDB_TIME_PRECISION_MILLI);
|
||||||
|
if (pReq->interval > TSMA_MAX_INTERVAL_MS * factor || pReq->interval < TSMA_MIN_INTERVAL_MS * factor) {
|
||||||
|
return TSDB_CODE_TSMA_INVALID_INTERVAL;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pReq->intervalUnit == TIME_UNIT_MONTH && (pReq->interval < 1 || pReq->interval > 12))
|
||||||
|
return TSDB_CODE_TSMA_INVALID_INTERVAL;
|
||||||
|
if (pReq->intervalUnit == TIME_UNIT_YEAR && (pReq->interval != 1))
|
||||||
|
return TSDB_CODE_TSMA_INVALID_INTERVAL;
|
||||||
|
}
|
||||||
|
|
||||||
STableMeta* pTableMeta = NULL;
|
STableMeta* pTableMeta = NULL;
|
||||||
STableTSMAInfo* pRecursiveTsma = NULL;
|
STableTSMAInfo* pRecursiveTsma = NULL;
|
||||||
|
@ -11163,7 +11184,8 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
|
||||||
pReq->recursiveTsma = true;
|
pReq->recursiveTsma = true;
|
||||||
tNameExtractFullName(useTbName, pReq->baseTsmaName);
|
tNameExtractFullName(useTbName, pReq->baseTsmaName);
|
||||||
SValueNode* pInterval = (SValueNode*)pStmt->pOptions->pInterval;
|
SValueNode* pInterval = (SValueNode*)pStmt->pOptions->pInterval;
|
||||||
if (pRecursiveTsma->interval < pInterval->datum.i && pInterval->datum.i % pRecursiveTsma->interval == 0) {
|
if (checkRecursiveTsmaInterval(pRecursiveTsma->interval, pRecursiveTsma->unit, pInterval->datum.i,
|
||||||
|
pInterval->unit, pDbInfo.precision, true)) {
|
||||||
} else {
|
} else {
|
||||||
code = TSDB_CODE_TSMA_INVALID_PARA;
|
code = TSDB_CODE_TSMA_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
@ -11234,7 +11256,8 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateCreateTSMA(STranslateContext* pCxt, SCreateTSMAStmt* pStmt) {
|
static int32_t translateCreateTSMA(STranslateContext* pCxt, SCreateTSMAStmt* pStmt) {
|
||||||
int32_t code = doTranslateValue(pCxt, (SValueNode*)pStmt->pOptions->pInterval);
|
pCxt->pCurrStmt = (SNode*)pStmt;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SName useTbName = {0};
|
SName useTbName = {0};
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -6018,6 +6018,7 @@ typedef struct STSMAOptUsefulTsma {
|
||||||
SArray* pTsmaScanCols; // SArray<int32_t> index of tsmaFuncs array
|
SArray* pTsmaScanCols; // SArray<int32_t> index of tsmaFuncs array
|
||||||
char targetTbName[TSDB_TABLE_NAME_LEN]; // the scanning table name, used only when pTsma is not NULL
|
char targetTbName[TSDB_TABLE_NAME_LEN]; // the scanning table name, used only when pTsma is not NULL
|
||||||
uint64_t targetTbUid; // the scanning table uid, used only when pTsma is not NULL
|
uint64_t targetTbUid; // the scanning table uid, used only when pTsma is not NULL
|
||||||
|
int8_t precision;
|
||||||
} STSMAOptUsefulTsma;
|
} STSMAOptUsefulTsma;
|
||||||
|
|
||||||
typedef struct STSMAOptCtx {
|
typedef struct STSMAOptCtx {
|
||||||
|
@ -6085,12 +6086,19 @@ static void clearTSMAOptCtx(STSMAOptCtx* pTsmaOptCtx) {
|
||||||
taosMemoryFreeClear(pTsmaOptCtx->queryInterval);
|
taosMemoryFreeClear(pTsmaOptCtx->queryInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool tsmaOptCheckValidInterval(int64_t tsmaInterval, int8_t tsmaIntevalUnit, const STSMAOptCtx* pTsmaOptCtx) {
|
static bool tsmaOptCheckValidInterval(int64_t tsmaInterval, int8_t unit, const STSMAOptCtx* pTsmaOptCtx) {
|
||||||
if (!pTsmaOptCtx->queryInterval) return true;
|
if (!pTsmaOptCtx->queryInterval) return true;
|
||||||
|
|
||||||
bool validInterval = pTsmaOptCtx->queryInterval->interval % tsmaInterval == 0;
|
bool validInterval = checkRecursiveTsmaInterval(tsmaInterval, unit, pTsmaOptCtx->queryInterval->interval,
|
||||||
bool validSliding = pTsmaOptCtx->queryInterval->sliding % tsmaInterval == 0;
|
pTsmaOptCtx->queryInterval->intervalUnit,
|
||||||
bool validOffset = pTsmaOptCtx->queryInterval->offset % tsmaInterval == 0;
|
pTsmaOptCtx->queryInterval->precision, false);
|
||||||
|
bool validSliding =
|
||||||
|
checkRecursiveTsmaInterval(tsmaInterval, unit, pTsmaOptCtx->queryInterval->sliding,
|
||||||
|
pTsmaOptCtx->queryInterval->slidingUnit, pTsmaOptCtx->queryInterval->precision, false);
|
||||||
|
bool validOffset =
|
||||||
|
pTsmaOptCtx->queryInterval->offset == 0 ||
|
||||||
|
checkRecursiveTsmaInterval(tsmaInterval, unit, pTsmaOptCtx->queryInterval->offset,
|
||||||
|
pTsmaOptCtx->queryInterval->offsetUnit, pTsmaOptCtx->queryInterval->precision, false);
|
||||||
return validInterval && validSliding && validOffset;
|
return validInterval && validSliding && validOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6171,7 +6179,8 @@ static bool tsmaOptCheckTags(STSMAOptCtx* pCtx, const STableTSMAInfo* pTsma) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
|
static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
|
||||||
STSMAOptUsefulTsma usefulTsma = {.pTsma = NULL, .scanRange = {.skey = TSKEY_MIN, .ekey = TSKEY_MAX}};
|
STSMAOptUsefulTsma usefulTsma = {
|
||||||
|
.pTsma = NULL, .scanRange = {.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, .precision = pTsmaOptCtx->precision};
|
||||||
SArray* pTsmaScanCols = NULL;
|
SArray* pTsmaScanCols = NULL;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pTsmaOptCtx->pTsmas->size; ++i) {
|
for (int32_t i = 0; i < pTsmaOptCtx->pTsmas->size; ++i) {
|
||||||
|
@ -6208,31 +6217,26 @@ static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsmaInfoCompWithIntervalDesc(const void* pLeft, const void* pRight) {
|
static int32_t tsmaInfoCompWithIntervalDesc(const void* pLeft, const void* pRight) {
|
||||||
|
const int64_t factors[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
|
||||||
const STSMAOptUsefulTsma *p = pLeft, *q = pRight;
|
const STSMAOptUsefulTsma *p = pLeft, *q = pRight;
|
||||||
int64_t pInterval = p->pTsma->interval, qInterval = q->pTsma->interval;
|
int64_t pInterval = p->pTsma->interval, qInterval = q->pTsma->interval;
|
||||||
int32_t code = getDuration(pInterval, p->pTsma->unit, &pInterval, TSDB_TIME_PRECISION_MILLI);
|
int8_t pUnit = p->pTsma->unit, qUnit = q->pTsma->unit;
|
||||||
ASSERT(code == TSDB_CODE_SUCCESS);
|
if (TIME_UNIT_MONTH == pUnit) {
|
||||||
code = getDuration(qInterval, q->pTsma->unit, &qInterval, TSDB_TIME_PRECISION_MILLI);
|
pInterval = pInterval * 31 * (NANOSECOND_PER_DAY / factors[p->precision]);
|
||||||
ASSERT(code == TSDB_CODE_SUCCESS);
|
} else if (TIME_UNIT_YEAR == pUnit){
|
||||||
|
pInterval = pInterval * 365 * (NANOSECOND_PER_DAY / factors[p->precision]);
|
||||||
|
}
|
||||||
|
if (TIME_UNIT_MONTH == qUnit) {
|
||||||
|
qInterval = qInterval * 31 * (NANOSECOND_PER_DAY / factors[q->precision]);
|
||||||
|
} else if (TIME_UNIT_YEAR == qUnit){
|
||||||
|
qInterval = qInterval * 365 * (NANOSECOND_PER_DAY / factors[q->precision]);
|
||||||
|
}
|
||||||
|
|
||||||
if (pInterval > qInterval) return -1;
|
if (pInterval > qInterval) return -1;
|
||||||
if (pInterval < qInterval) return 1;
|
if (pInterval < qInterval) return 1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static const STSMAOptUsefulTsma* tsmaOptFindUsefulTsma(const SArray* pUsefulTsmas, int32_t startIdx,
|
|
||||||
int64_t alignInterval, int64_t alignInterval2,
|
|
||||||
int8_t precision) {
|
|
||||||
int64_t tsmaInterval;
|
|
||||||
for (int32_t i = startIdx; i < pUsefulTsmas->size; ++i) {
|
|
||||||
const STSMAOptUsefulTsma* pUsefulTsma = taosArrayGet(pUsefulTsmas, i);
|
|
||||||
getDuration(pUsefulTsma->pTsma->interval, pUsefulTsma->pTsma->unit, &tsmaInterval, precision);
|
|
||||||
if (alignInterval % tsmaInterval == 0 && alignInterval2 % tsmaInterval == 0) {
|
|
||||||
return pUsefulTsma;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tsmaOptInitIntervalFromTsma(SInterval* pInterval, const STableTSMAInfo* pTsma, int8_t precision) {
|
static void tsmaOptInitIntervalFromTsma(SInterval* pInterval, const STableTSMAInfo* pTsma, int8_t precision) {
|
||||||
pInterval->interval = pTsma->interval;
|
pInterval->interval = pTsma->interval;
|
||||||
pInterval->intervalUnit = pTsma->unit;
|
pInterval->intervalUnit = pTsma->unit;
|
||||||
|
@ -6243,14 +6247,28 @@ static void tsmaOptInitIntervalFromTsma(SInterval* pInterval, const STableTSMAIn
|
||||||
pInterval->precision = precision;
|
pInterval->precision = precision;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static const STSMAOptUsefulTsma* tsmaOptFindUsefulTsma(const SArray* pUsefulTsmas, int32_t startIdx,
|
||||||
|
int64_t startAlignInterval, int64_t endAlignInterval,
|
||||||
|
int8_t precision) {
|
||||||
|
SInterval tsmaInterval;
|
||||||
|
for (int32_t i = startIdx; i < pUsefulTsmas->size; ++i) {
|
||||||
|
const STSMAOptUsefulTsma* pUsefulTsma = taosArrayGet(pUsefulTsmas, i);
|
||||||
|
tsmaOptInitIntervalFromTsma(&tsmaInterval, pUsefulTsma->pTsma, precision);
|
||||||
|
if (taosTimeTruncate(startAlignInterval, &tsmaInterval) == startAlignInterval &&
|
||||||
|
taosTimeTruncate(endAlignInterval, &tsmaInterval) == endAlignInterval) {
|
||||||
|
return pUsefulTsma;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pScanRange) {
|
static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pScanRange) {
|
||||||
bool needTailWindow = false;
|
bool needTailWindow = false;
|
||||||
bool isSkeyAlignedWithTsma = true, isEkeyAlignedWithTsma = true;
|
bool isSkeyAlignedWithTsma = true, isEkeyAlignedWithTsma = true;
|
||||||
int64_t winSkey = TSKEY_MIN, winEkey = TSKEY_MAX;
|
int64_t winSkey = TSKEY_MIN, winEkey = TSKEY_MAX;
|
||||||
int64_t startOfSkeyFirstWin = pScanRange->skey, endOfSkeyFirstWin;
|
int64_t startOfSkeyFirstWin = pScanRange->skey, endOfSkeyFirstWin;
|
||||||
int64_t startOfEkeyFirstWin = pScanRange->ekey, endOfEkeyFirstWin;
|
int64_t startOfEkeyFirstWin = pScanRange->ekey, endOfEkeyFirstWin;
|
||||||
int64_t tsmaInterval;
|
SInterval interval, tsmaInterval;
|
||||||
SInterval interval;
|
|
||||||
STimeWindow scanRange = *pScanRange;
|
STimeWindow scanRange = *pScanRange;
|
||||||
const SInterval* pInterval = pTsmaOptCtx->queryInterval;
|
const SInterval* pInterval = pTsmaOptCtx->queryInterval;
|
||||||
const STSMAOptUsefulTsma* pUsefulTsma = taosArrayGet(pTsmaOptCtx->pUsefulTsmas, 0);
|
const STSMAOptUsefulTsma* pUsefulTsma = taosArrayGet(pTsmaOptCtx->pUsefulTsmas, 0);
|
||||||
|
@ -6263,14 +6281,14 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc
|
||||||
pInterval = &interval;
|
pInterval = &interval;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsmaInterval = pTsma->interval;
|
tsmaOptInitIntervalFromTsma(&tsmaInterval, pTsma, pTsmaOptCtx->precision);
|
||||||
|
|
||||||
// check for head windows
|
// check for head windows
|
||||||
if (pScanRange->skey != TSKEY_MIN) {
|
if (pScanRange->skey != TSKEY_MIN) {
|
||||||
startOfSkeyFirstWin = taosTimeTruncate(pScanRange->skey, pInterval);
|
startOfSkeyFirstWin = taosTimeTruncate(pScanRange->skey, pInterval);
|
||||||
endOfSkeyFirstWin =
|
endOfSkeyFirstWin =
|
||||||
taosTimeAdd(startOfSkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pTsmaOptCtx->precision);
|
taosTimeAdd(startOfSkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pTsmaOptCtx->precision);
|
||||||
isSkeyAlignedWithTsma = ((pScanRange->skey - startOfSkeyFirstWin) % tsmaInterval == 0);
|
isSkeyAlignedWithTsma = taosTimeTruncate(pScanRange->skey, &tsmaInterval) == pScanRange->skey;
|
||||||
} else {
|
} else {
|
||||||
endOfSkeyFirstWin = TSKEY_MIN;
|
endOfSkeyFirstWin = TSKEY_MIN;
|
||||||
}
|
}
|
||||||
|
@ -6280,7 +6298,7 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc
|
||||||
startOfEkeyFirstWin = taosTimeTruncate(pScanRange->ekey, pInterval);
|
startOfEkeyFirstWin = taosTimeTruncate(pScanRange->ekey, pInterval);
|
||||||
endOfEkeyFirstWin =
|
endOfEkeyFirstWin =
|
||||||
taosTimeAdd(startOfEkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pTsmaOptCtx->precision);
|
taosTimeAdd(startOfEkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pTsmaOptCtx->precision);
|
||||||
isEkeyAlignedWithTsma = ((pScanRange->ekey + 1 - startOfEkeyFirstWin) % tsmaInterval == 0);
|
isEkeyAlignedWithTsma = taosTimeTruncate(pScanRange->ekey + 1, &tsmaInterval) == (pScanRange->ekey + 1);
|
||||||
if (startOfEkeyFirstWin > startOfSkeyFirstWin) {
|
if (startOfEkeyFirstWin > startOfSkeyFirstWin) {
|
||||||
needTailWindow = true;
|
needTailWindow = true;
|
||||||
}
|
}
|
||||||
|
@ -6292,8 +6310,7 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc
|
||||||
scanRange.ekey,
|
scanRange.ekey,
|
||||||
taosTimeAdd(startOfSkeyFirstWin, pInterval->interval * 1, pInterval->intervalUnit, pTsmaOptCtx->precision) - 1);
|
taosTimeAdd(startOfSkeyFirstWin, pInterval->interval * 1, pInterval->intervalUnit, pTsmaOptCtx->precision) - 1);
|
||||||
const STSMAOptUsefulTsma* pTsmaFound =
|
const STSMAOptUsefulTsma* pTsmaFound =
|
||||||
tsmaOptFindUsefulTsma(pTsmaOptCtx->pUsefulTsmas, 1, scanRange.skey - startOfSkeyFirstWin,
|
tsmaOptFindUsefulTsma(pTsmaOptCtx->pUsefulTsmas, 1, scanRange.skey, scanRange.ekey + 1, pTsmaOptCtx->precision);
|
||||||
(scanRange.ekey + 1 - startOfSkeyFirstWin), pTsmaOptCtx->precision);
|
|
||||||
STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsmaFound ? pTsmaFound->pTsma : NULL,
|
STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsmaFound ? pTsmaFound->pTsma : NULL,
|
||||||
.scanRange = scanRange,
|
.scanRange = scanRange,
|
||||||
.pTsmaScanCols = pTsmaFound ? pTsmaFound->pTsmaScanCols : NULL};
|
.pTsmaScanCols = pTsmaFound ? pTsmaFound->pTsmaScanCols : NULL};
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <tutil.h>
|
#include <tutil.h>
|
||||||
#include <random>
|
#include <random>
|
||||||
|
#include "ttime.h"
|
||||||
|
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
|
@ -382,3 +383,94 @@ TEST(utilTest, intToHextStr) {
|
||||||
ASSERT_STREQ(buf, destBuf);
|
ASSERT_STREQ(buf, destBuf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
static int64_t getIntervalValWithPrecision(int64_t interval, int8_t unit, int8_t precision) {
|
||||||
|
if (IS_CALENDAR_TIME_DURATION(unit)) {
|
||||||
|
return interval;
|
||||||
|
}
|
||||||
|
if(0 != getDuration(interval, unit, &interval, precision)) {
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
return interval;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool tsmaIntervalCheck(int64_t baseInterval, int8_t baseUnit, int64_t interval, int8_t unit, int8_t precision) {
|
||||||
|
auto ret = checkRecursiveTsmaInterval(getIntervalValWithPrecision(baseInterval, baseUnit, precision), baseUnit,
|
||||||
|
getIntervalValWithPrecision(interval, unit, precision), unit, precision, true);
|
||||||
|
using namespace std;
|
||||||
|
cout << interval << unit << " on " << baseInterval << baseUnit << ": " << ret << endl;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(tsma, reverse_unit) {
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(1, 'm', 120, 's', TSDB_TIME_PRECISION_MILLI));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(1, 'h', 120, 'm', TSDB_TIME_PRECISION_MILLI));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(20, 's', 2 * 20 * 1000, 'a', TSDB_TIME_PRECISION_MILLI));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(20, 's', 2 * 20 * 1000 * 1000, 'u', TSDB_TIME_PRECISION_MILLI));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(20, 's', 2UL * 20UL * 1000UL * 1000UL * 1000UL, 'b', TSDB_TIME_PRECISION_MILLI));
|
||||||
|
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(1, 'h', 60, 'm', TSDB_TIME_PRECISION_MILLI));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(1, 'h', 6, 'm', TSDB_TIME_PRECISION_MILLI));
|
||||||
|
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(2, 'h', 120, 'm', TSDB_TIME_PRECISION_MILLI));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(2, 'h', 240, 'm', TSDB_TIME_PRECISION_MILLI));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(1, 'd', 240, 'm', TSDB_TIME_PRECISION_MILLI));
|
||||||
|
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(1, 'd', 1440, 'm', TSDB_TIME_PRECISION_MILLI));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(1, 'd', 2880, 'm', TSDB_TIME_PRECISION_MILLI));
|
||||||
|
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(1, 'y', 365, 'd', TSDB_TIME_PRECISION_MILLI));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(1, 'n', 30, 'd', TSDB_TIME_PRECISION_MILLI));
|
||||||
|
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(1, 'y', 24, 'n', TSDB_TIME_PRECISION_MILLI));
|
||||||
|
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(55, 's', 55, 'm', TSDB_TIME_PRECISION_MILLI));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(10, 's', 1, 'm', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(10, 's', 2, 'm', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(10, 's', 20, 'm', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(10, 's', 50, 'm', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(120, 's', 30, 'm', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(360, 's', 30, 'm', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(600, 's', 30, 'm', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(600, 's', 15, 'm', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(10, 's', 1, 'h', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(15, 's', 1, 'h', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(7*60, 's', 1, 'h', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(10, 's', 1, 'd', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(10, 's', 1, 'w', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(1, 'd', 1, 'w', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(10, 's', 1, 'n', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(10, 's', 1, 'y', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(1, 'd', 1, 'w', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(1, 'd', 1, 'n', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(1, 'd', 2, 'n', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(2, 'd', 2, 'n', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(2, 'd', 2, 'y', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(2, 'd', 1, 'y', TSDB_TIME_PRECISION_MICRO));
|
||||||
|
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(1, 'w', 1, 'n', TSDB_TIME_PRECISION_NANO));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(4, 'w', 1, 'n', TSDB_TIME_PRECISION_NANO));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(1, 'w', 1, 'y', TSDB_TIME_PRECISION_NANO));
|
||||||
|
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(1, 'n', 1, 'y', TSDB_TIME_PRECISION_NANO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(2, 'n', 1, 'y', TSDB_TIME_PRECISION_NANO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(3, 'n', 1, 'y', TSDB_TIME_PRECISION_NANO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(4, 'n', 1, 'y', TSDB_TIME_PRECISION_NANO));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(5, 'n', 1, 'y', TSDB_TIME_PRECISION_NANO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(6, 'n', 1, 'y', TSDB_TIME_PRECISION_NANO));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(7, 'n', 1, 'y', TSDB_TIME_PRECISION_NANO));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(8, 'n', 1, 'y', TSDB_TIME_PRECISION_NANO));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(9, 'n', 1, 'y', TSDB_TIME_PRECISION_NANO));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(10, 'n', 1, 'y', TSDB_TIME_PRECISION_NANO));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(11, 'n', 1, 'y', TSDB_TIME_PRECISION_NANO));
|
||||||
|
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(1, 'w', 1, 'w', TSDB_TIME_PRECISION_NANO));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(120, 's', 2, 'm', TSDB_TIME_PRECISION_NANO));
|
||||||
|
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(2, 'n', 2, 'n', TSDB_TIME_PRECISION_NANO));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(2, 'y', 2, 'y', TSDB_TIME_PRECISION_NANO));
|
||||||
|
ASSERT_FALSE(tsmaIntervalCheck(12, 'n', 1, 'y', TSDB_TIME_PRECISION_NANO));
|
||||||
|
ASSERT_TRUE(tsmaIntervalCheck(3, 'n', 1, 'y', TSDB_TIME_PRECISION_NANO));
|
||||||
|
}
|
||||||
|
|
|
@ -131,6 +131,11 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsma.py -Q 2
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsma.py -Q 2
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsma.py -Q 3
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsma.py -Q 3
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsma.py -Q 4
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsma.py -Q 4
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsma2.py
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsma2.py -R
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsma2.py -Q 2
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsma2.py -Q 3
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsma2.py -Q 4
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
|
||||||
|
|
|
@ -687,10 +687,10 @@ class TDTestCase:
|
||||||
tdLog.debug("insert data ............ [OK]")
|
tdLog.debug("insert data ............ [OK]")
|
||||||
return
|
return
|
||||||
|
|
||||||
def init_data(self, ctb_num: int = 10, rows_per_ctb: int = 10000, start_ts: int = 1537146000000, ts_step: int = 500):
|
def init_data(self, db: str = 'test', ctb_num: int = 10, rows_per_ctb: int = 10000, start_ts: int = 1537146000000, ts_step: int = 500):
|
||||||
tdLog.printNoPrefix(
|
tdLog.printNoPrefix(
|
||||||
"======== prepare test env include database, stable, ctables, and insert data: ")
|
"======== prepare test env include database, stable, ctables, and insert data: ")
|
||||||
paraDict = {'dbName': 'test',
|
paraDict = {'dbName': db,
|
||||||
'dropFlag': 1,
|
'dropFlag': 1,
|
||||||
'vgroups': 2,
|
'vgroups': 2,
|
||||||
'stbName': 'meters',
|
'stbName': 'meters',
|
||||||
|
@ -707,8 +707,8 @@ class TDTestCase:
|
||||||
'tsStep': ts_step}
|
'tsStep': ts_step}
|
||||||
|
|
||||||
paraDict['vgroups'] = self.vgroups
|
paraDict['vgroups'] = self.vgroups
|
||||||
paraDict['ctbNum'] = self.ctbNum
|
paraDict['ctbNum'] = ctb_num
|
||||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
paraDict['rowsPerTbl'] = rows_per_ctb
|
||||||
|
|
||||||
tdLog.info("create database")
|
tdLog.info("create database")
|
||||||
self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"],
|
self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"],
|
||||||
|
@ -972,16 +972,16 @@ class TDTestCase:
|
||||||
sql = 'select avg(c2), "recursive test.tsma4" from test.meters'
|
sql = 'select avg(c2), "recursive test.tsma4" from test.meters'
|
||||||
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma(
|
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma(
|
||||||
'tsma4', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc()
|
'tsma4', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc()
|
||||||
#time.sleep(999999)
|
|
||||||
self.tsma_tester.check_sql(sql, ctx)
|
self.tsma_tester.check_sql(sql, ctx)
|
||||||
self.check(self.test_query_tsma_all(select_func_list))
|
self.check(self.test_query_tsma_all(select_func_list))
|
||||||
self.create_recursive_tsma(
|
self.create_recursive_tsma(
|
||||||
'tsma4', 'tsma6', 'test', '1h', 'meters', tsma_func_list)
|
'tsma4', 'tsma6', 'test', '5h', 'meters', tsma_func_list)
|
||||||
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma(
|
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma(
|
||||||
'tsma6', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc()
|
'tsma6', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc()
|
||||||
self.tsma_tester.check_sql(sql, ctx)
|
self.tsma_tester.check_sql(sql, ctx)
|
||||||
|
|
||||||
self.check(self.test_query_tsma_all(select_func_list))
|
self.check(self.test_query_tsma_all(select_func_list))
|
||||||
|
#time.sleep(999999)
|
||||||
|
|
||||||
tdSql.error('drop tsma test.tsma3', -2147482491)
|
tdSql.error('drop tsma test.tsma3', -2147482491)
|
||||||
tdSql.error('drop tsma test.tsma4', -2147482491)
|
tdSql.error('drop tsma test.tsma4', -2147482491)
|
||||||
|
@ -1217,7 +1217,6 @@ class TDTestCase:
|
||||||
self.test_create_tsma()
|
self.test_create_tsma()
|
||||||
self.test_drop_tsma()
|
self.test_drop_tsma()
|
||||||
self.test_tb_ddl_with_created_tsma()
|
self.test_tb_ddl_with_created_tsma()
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.init_data()
|
self.init_data()
|
||||||
|
@ -1358,18 +1357,19 @@ class TDTestCase:
|
||||||
'create table nsdb.meters(ts timestamp, c1 int, c2 int, c3 varchar(255)) tags(t1 int, t2 int)', queryTimes=1)
|
'create table nsdb.meters(ts timestamp, c1 int, c2 int, c3 varchar(255)) tags(t1 int, t2 int)', queryTimes=1)
|
||||||
self.create_tsma('tsma1', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
|
self.create_tsma('tsma1', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
|
||||||
# Invalid tsma interval, 1ms ~ 1h is allowed
|
# Invalid tsma interval, 1ms ~ 1h is allowed
|
||||||
tdSql.error(
|
def _():
|
||||||
'create tsma tsma2 on nsdb.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097)
|
tdSql.error(
|
||||||
tdSql.error(
|
'create tsma tsma2 on nsdb.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097)
|
||||||
'create tsma tsma2 on nsdb.meters function(avg(c1), avg(c2)) interval(3601s)', -2147471097)
|
tdSql.error(
|
||||||
tdSql.error(
|
'create tsma tsma2 on nsdb.meters function(avg(c1), avg(c2)) interval(3601s)', -2147471097)
|
||||||
'create tsma tsma2 on nsdb.meters function(avg(c1), avg(c2)) interval(3600001a)', -2147471097)
|
tdSql.error(
|
||||||
tdSql.error(
|
'create tsma tsma2 on nsdb.meters function(avg(c1), avg(c2)) interval(3600001a)', -2147471097)
|
||||||
'create tsma tsma2 on nsdb.meters function(avg(c1), avg(c2)) interval(3600001000u)', -2147471097)
|
tdSql.error(
|
||||||
tdSql.error(
|
'create tsma tsma2 on nsdb.meters function(avg(c1), avg(c2)) interval(3600001000u)', -2147471097)
|
||||||
'create tsma tsma2 on nsdb.meters function(avg(c1), avg(c2)) interval(999999b)', -2147471097)
|
tdSql.error(
|
||||||
tdSql.error(
|
'create tsma tsma2 on nsdb.meters function(avg(c1), avg(c2)) interval(999999b)', -2147471097)
|
||||||
'create tsma tsma2 on nsdb.meters function(avg(c1), avg(c2)) interval(999u)', -2147471097)
|
tdSql.error(
|
||||||
|
'create tsma tsma2 on nsdb.meters function(avg(c1), avg(c2)) interval(999u)', -2147471097)
|
||||||
# invalid tsma func param
|
# invalid tsma func param
|
||||||
tdSql.error(
|
tdSql.error(
|
||||||
'create tsma tsma2 on nsdb.meters function(avg(c1, c2), avg(c2)) interval(10m)', -2147471096)
|
'create tsma tsma2 on nsdb.meters function(avg(c1, c2), avg(c2)) interval(10m)', -2147471096)
|
||||||
|
@ -1446,8 +1446,6 @@ class TDTestCase:
|
||||||
['avg(c1)', 'avg(c2)'], 'nsdb', 'meters', '10m', 'tsma1')
|
['avg(c1)', 'avg(c2)'], 'nsdb', 'meters', '10m', 'tsma1')
|
||||||
tdSql.execute('drop tsma nsdb.tsma1', queryTimes=1)
|
tdSql.execute('drop tsma nsdb.tsma1', queryTimes=1)
|
||||||
|
|
||||||
tdSql.error(
|
|
||||||
'create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097)
|
|
||||||
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u')
|
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u')
|
||||||
tdSql.execute('drop database nsdb')
|
tdSql.execute('drop database nsdb')
|
||||||
|
|
||||||
|
@ -1611,7 +1609,6 @@ class TDTestCase:
|
||||||
|
|
||||||
# def test_split_dnode(self):
|
# def test_split_dnode(self):
|
||||||
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
tdLog.success(f"{__file__} successfully executed")
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
|
@ -0,0 +1,909 @@
|
||||||
|
from random import randrange
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import secrets
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
from util.common import *
|
||||||
|
# from tmqCommon import *
|
||||||
|
|
||||||
|
ROUND = 100
|
||||||
|
|
||||||
|
ignore_some_tests: int = 1
|
||||||
|
|
||||||
|
class TSMA:
|
||||||
|
def __init__(self):
|
||||||
|
self.tsma_name = ''
|
||||||
|
self.db_name = ''
|
||||||
|
self.original_table_name = ''
|
||||||
|
self.funcs = []
|
||||||
|
self.cols = []
|
||||||
|
self.interval: str = ''
|
||||||
|
|
||||||
|
|
||||||
|
class UsedTsma:
|
||||||
|
TS_MIN = '-9223372036854775808'
|
||||||
|
TS_MAX = '9223372036854775806'
|
||||||
|
TSMA_RES_STB_POSTFIX = '_tsma_res_stb_'
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.name = '' # tsma name or table name
|
||||||
|
self.time_range_start: float = float(UsedTsma.TS_MIN)
|
||||||
|
self.time_range_end: float = float(UsedTsma.TS_MAX)
|
||||||
|
self.is_tsma_ = False
|
||||||
|
|
||||||
|
def __eq__(self, __value: object) -> bool:
|
||||||
|
if isinstance(__value, self.__class__):
|
||||||
|
return self.name == __value.name \
|
||||||
|
and self.time_range_start == __value.time_range_start \
|
||||||
|
and self.time_range_end == __value.time_range_end \
|
||||||
|
and self.is_tsma_ == __value.is_tsma_
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def __ne__(self, __value: object) -> bool:
|
||||||
|
return not self.__eq__(__value)
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return "%s: from %s to %s is_tsma: %d" % (self.name, self.time_range_start, self.time_range_end, self.is_tsma_)
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
return self.__str__()
|
||||||
|
|
||||||
|
def setIsTsma(self):
|
||||||
|
self.is_tsma_ = self.name.endswith(self.TSMA_RES_STB_POSTFIX)
|
||||||
|
if not self.is_tsma_:
|
||||||
|
self.is_tsma_ = len(self.name) == 32 # for tsma output child table
|
||||||
|
|
||||||
|
class TSMAQueryContext:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.sql = ''
|
||||||
|
self.used_tsmas: List[UsedTsma] = []
|
||||||
|
self.ignore_tsma_check_ = False
|
||||||
|
self.ignore_res_order_ = False
|
||||||
|
|
||||||
|
def __eq__(self, __value) -> bool:
|
||||||
|
if isinstance(__value, self.__class__):
|
||||||
|
if self.ignore_tsma_check_ or __value.ignore_tsma_check_:
|
||||||
|
return True
|
||||||
|
if len(self.used_tsmas) != len(__value.used_tsmas):
|
||||||
|
return False
|
||||||
|
for used_tsma1, used_tsma2 in zip(self.used_tsmas, __value.used_tsmas):
|
||||||
|
if not used_tsma1 == used_tsma2:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def __ne__(self, __value: object) -> bool:
|
||||||
|
return self.__eq__(__value)
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return str(self.used_tsmas)
|
||||||
|
|
||||||
|
def has_tsma(self) -> bool:
|
||||||
|
for tsma in self.used_tsmas:
|
||||||
|
if tsma.is_tsma_:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class TSMAQCBuilder:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.qc_: TSMAQueryContext = TSMAQueryContext()
|
||||||
|
|
||||||
|
def get_qc(self) -> TSMAQueryContext:
|
||||||
|
return self.qc_
|
||||||
|
|
||||||
|
def with_sql(self, sql: str):
|
||||||
|
self.qc_.sql = sql
|
||||||
|
return self
|
||||||
|
|
||||||
|
def to_timestamp(self, ts: str) -> float:
|
||||||
|
if ts == UsedTsma.TS_MAX or ts == UsedTsma.TS_MIN:
|
||||||
|
return float(ts)
|
||||||
|
tdSql.query(
|
||||||
|
"select to_timestamp('%s', 'yyyy-mm-dd hh24-mi-ss.ms')" % (ts))
|
||||||
|
res = tdSql.queryResult[0][0]
|
||||||
|
return res.timestamp() * 1000
|
||||||
|
|
||||||
|
def md5(self, buf: str) -> str:
|
||||||
|
tdSql.query(f'select md5("{buf}")')
|
||||||
|
res = tdSql.queryResult[0][0]
|
||||||
|
return res
|
||||||
|
|
||||||
|
def should_query_with_table(self, tb_name: str, ts_begin: str = UsedTsma.TS_MIN, ts_end: str = UsedTsma.TS_MAX) -> 'TSMAQCBuilder':
|
||||||
|
used_tsma: UsedTsma = UsedTsma()
|
||||||
|
used_tsma.name = tb_name
|
||||||
|
used_tsma.time_range_start = self.to_timestamp(ts_begin)
|
||||||
|
used_tsma.time_range_end = self.to_timestamp(ts_end)
|
||||||
|
used_tsma.is_tsma_ = False
|
||||||
|
self.qc_.used_tsmas.append(used_tsma)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def should_query_with_tsma_ctb(self, db_name: str, tsma_name: str, ctb_name: str, ts_begin: str = UsedTsma.TS_MIN, ts_end: str = UsedTsma.TS_MAX) -> 'TSMAQCBuilder':
|
||||||
|
used_tsma: UsedTsma = UsedTsma()
|
||||||
|
name = f'1.{db_name}.{tsma_name}_{ctb_name}'
|
||||||
|
used_tsma.name = self.md5(name)
|
||||||
|
used_tsma.time_range_start = self.to_timestamp(ts_begin)
|
||||||
|
used_tsma.time_range_end = self.to_timestamp(ts_end)
|
||||||
|
used_tsma.is_tsma_ = True
|
||||||
|
self.qc_.used_tsmas.append(used_tsma)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def ignore_query_table(self):
|
||||||
|
self.qc_.ignore_tsma_check_ = True
|
||||||
|
return self
|
||||||
|
|
||||||
|
def ignore_res_order(self, ignore: bool):
|
||||||
|
self.qc_.ignore_res_order_ = ignore
|
||||||
|
return self
|
||||||
|
|
||||||
|
def should_query_with_tsma(self, tsma_name: str, ts_begin: str = UsedTsma.TS_MIN, ts_end: str = UsedTsma.TS_MAX, child_tb: bool = False) -> 'TSMAQCBuilder':
|
||||||
|
used_tsma: UsedTsma = UsedTsma()
|
||||||
|
if child_tb:
|
||||||
|
used_tsma.name = tsma_name
|
||||||
|
else:
|
||||||
|
used_tsma.name = tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX
|
||||||
|
used_tsma.time_range_start = self.to_timestamp(ts_begin)
|
||||||
|
used_tsma.time_range_end = self.to_timestamp(ts_end)
|
||||||
|
used_tsma.is_tsma_ = True
|
||||||
|
self.qc_.used_tsmas.append(used_tsma)
|
||||||
|
return self
|
||||||
|
|
||||||
|
|
||||||
|
class TSMATester:
|
||||||
|
def __init__(self, tdSql: TDSql) -> None:
|
||||||
|
self.tsmas = []
|
||||||
|
self.tdSql: TDSql = tdSql
|
||||||
|
|
||||||
|
def explain_sql(self, sql: str):
|
||||||
|
tdSql.execute("alter local 'querySmaOptimize' '1'")
|
||||||
|
sql = "explain verbose true " + sql
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
res = self.tdSql.queryResult
|
||||||
|
if self.tdSql.queryResult is None:
|
||||||
|
raise
|
||||||
|
return res
|
||||||
|
|
||||||
|
def get_tsma_query_ctx(self, sql: str):
|
||||||
|
explain_res = self.explain_sql(sql)
|
||||||
|
query_ctx: TSMAQueryContext = TSMAQueryContext()
|
||||||
|
query_ctx.sql = sql
|
||||||
|
query_ctx.used_tsmas = []
|
||||||
|
used_tsma: UsedTsma = UsedTsma()
|
||||||
|
for row in explain_res:
|
||||||
|
row = str(row)
|
||||||
|
if len(used_tsma.name) == 0:
|
||||||
|
idx = row.find("Table Scan on ")
|
||||||
|
if idx >= 0:
|
||||||
|
words = row[idx:].split(' ')
|
||||||
|
used_tsma.name = words[3]
|
||||||
|
used_tsma.setIsTsma()
|
||||||
|
else:
|
||||||
|
idx = row.find('Time Range:')
|
||||||
|
if idx >= 0:
|
||||||
|
row = row[idx:].split('[')[1]
|
||||||
|
row = row.split(']')[0]
|
||||||
|
words = row.split(',')
|
||||||
|
used_tsma.time_range_start = float(words[0].strip())
|
||||||
|
used_tsma.time_range_end = float(words[1].strip())
|
||||||
|
query_ctx.used_tsmas.append(used_tsma)
|
||||||
|
used_tsma = UsedTsma()
|
||||||
|
|
||||||
|
deduplicated_tsmas: list[UsedTsma] = []
|
||||||
|
if len(query_ctx.used_tsmas) > 0:
|
||||||
|
deduplicated_tsmas.append(query_ctx.used_tsmas[0])
|
||||||
|
for tsma in query_ctx.used_tsmas:
|
||||||
|
if tsma == deduplicated_tsmas[-1]:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
deduplicated_tsmas.append(tsma)
|
||||||
|
query_ctx.used_tsmas = deduplicated_tsmas
|
||||||
|
|
||||||
|
return query_ctx
|
||||||
|
|
||||||
|
def check_explain(self, sql: str, expect: TSMAQueryContext) -> TSMAQueryContext:
|
||||||
|
query_ctx = self.get_tsma_query_ctx(sql)
|
||||||
|
if not query_ctx == expect:
|
||||||
|
tdLog.exit('check explain failed for sql: %s \nexpect: %s \nactual: %s' % (
|
||||||
|
sql, str(expect), str(query_ctx)))
|
||||||
|
elif expect.has_tsma():
|
||||||
|
tdLog.debug('check explain succeed for sql: %s \ntsma: %s' %
|
||||||
|
(sql, str(expect.used_tsmas)))
|
||||||
|
has_tsma = False
|
||||||
|
for tsma in query_ctx.used_tsmas:
|
||||||
|
has_tsma = has_tsma or tsma.is_tsma_
|
||||||
|
if not has_tsma and len(query_ctx.used_tsmas) > 1:
|
||||||
|
tdLog.exit(
|
||||||
|
f'explain err for sql: {sql}, has multi non tsmas, {query_ctx.used_tsmas}')
|
||||||
|
return query_ctx
|
||||||
|
|
||||||
|
def check_result(self, sql: str, skip_order: bool = False):
|
||||||
|
tdSql.execute("alter local 'querySmaOptimize' '1'")
|
||||||
|
tsma_res = tdSql.getResult(sql)
|
||||||
|
|
||||||
|
tdSql.execute("alter local 'querySmaOptimize' '0'")
|
||||||
|
no_tsma_res = tdSql.getResult(sql)
|
||||||
|
|
||||||
|
if no_tsma_res is None or tsma_res is None:
|
||||||
|
if no_tsma_res != tsma_res:
|
||||||
|
tdLog.exit("comparing tsma res for: %s got different rows of result: without tsma: %s, with tsma: %s" % (
|
||||||
|
sql, str(no_tsma_res), str(tsma_res)))
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
|
if len(no_tsma_res) != len(tsma_res):
|
||||||
|
tdLog.exit("comparing tsma res for: %s got different rows of result: \nwithout tsma: %s\nwith tsma: %s" % (
|
||||||
|
sql, str(no_tsma_res), str(tsma_res)))
|
||||||
|
if skip_order:
|
||||||
|
try:
|
||||||
|
no_tsma_res.sort(
|
||||||
|
key=lambda x: [v is None for v in x] + list(x))
|
||||||
|
tsma_res.sort(key=lambda x: [v is None for v in x] + list(x))
|
||||||
|
except Exception as e:
|
||||||
|
tdLog.exit("comparing tsma res for: %s got different data: \nno tsma res: %s \n tsma res: %s err: %s" % (
|
||||||
|
sql, str(no_tsma_res), str(tsma_res), str(e)))
|
||||||
|
|
||||||
|
for row_no_tsma, row_tsma in zip(no_tsma_res, tsma_res):
|
||||||
|
if row_no_tsma != row_tsma:
|
||||||
|
tdLog.exit("comparing tsma res for: %s got different row data: no tsma row: %s, tsma row: %s \nno tsma res: %s \n tsma res: %s" % (
|
||||||
|
sql, str(row_no_tsma), str(row_tsma), str(no_tsma_res), str(tsma_res)))
|
||||||
|
tdLog.info('result check succeed for sql: %s. \n tsma-res: %s. \nno_tsma-res: %s' %
|
||||||
|
(sql, str(tsma_res), str(no_tsma_res)))
|
||||||
|
|
||||||
|
def check_sql(self, sql: str, expect: TSMAQueryContext):
|
||||||
|
tdLog.debug(f"start to check sql: {sql}")
|
||||||
|
actual_ctx = self.check_explain(sql, expect=expect)
|
||||||
|
tdLog.debug(f"ctx: {actual_ctx}")
|
||||||
|
if actual_ctx.has_tsma():
|
||||||
|
self.check_result(sql, expect.ignore_res_order_)
|
||||||
|
|
||||||
|
def check_sqls(self, sqls, expects):
|
||||||
|
for sql, query_ctx in zip(sqls, expects):
|
||||||
|
self.check_sql(sql, query_ctx)
|
||||||
|
|
||||||
|
|
||||||
|
class TSMATesterSQLGeneratorOptions:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.ts_min: int = 1537146000000 - 1000 * 60 * 60
|
||||||
|
self.ts_max: int = 1537150999000 + 1000 * 60 * 60
|
||||||
|
self.times: int = 100
|
||||||
|
self.pk_col: str = 'ts'
|
||||||
|
self.column_prefix: str = 'c'
|
||||||
|
self.column_num: int = 9 # c1 - c10
|
||||||
|
self.tags_prefix: str = 't'
|
||||||
|
self.tag_num: int = 6 # t1 - t6
|
||||||
|
self.str_tag_idx: List = [2, 3]
|
||||||
|
self.child_table_name_prefix: str = 't'
|
||||||
|
self.child_table_num: int = 10 # t0 - t9
|
||||||
|
self.interval: bool = False
|
||||||
|
# 70% generating a partition by, 30% no partition by, same as group by
|
||||||
|
self.partition_by: bool = False
|
||||||
|
self.group_by: bool = False
|
||||||
|
# generating no ts range condition is also possible
|
||||||
|
self.where_ts_range: bool = False
|
||||||
|
self.where_tbname_func: bool = False
|
||||||
|
self.where_tag_func: bool = False
|
||||||
|
self.where_col_func: bool = False
|
||||||
|
self.slimit_max = 10
|
||||||
|
self.limit_max = 10
|
||||||
|
self.norm_tb = False
|
||||||
|
|
||||||
|
|
||||||
|
class TSMATesterSQLGeneratorRes:
|
||||||
|
def __init__(self):
|
||||||
|
self.has_where_ts_range: bool = False
|
||||||
|
self.has_interval: bool = False
|
||||||
|
self.partition_by: bool = False
|
||||||
|
self.group_by: bool = False
|
||||||
|
self.has_slimit: bool = False
|
||||||
|
self.has_limit: bool = False
|
||||||
|
self.has_user_order_by: bool = False
|
||||||
|
|
||||||
|
def can_ignore_res_order(self):
|
||||||
|
return not (self.has_limit and self.has_slimit)
|
||||||
|
|
||||||
|
|
||||||
|
class TSMATestSQLGenerator:
|
||||||
|
def __init__(self, opts: TSMATesterSQLGeneratorOptions = TSMATesterSQLGeneratorOptions()):
|
||||||
|
self.db_name_: str = ''
|
||||||
|
self.tb_name_: str = ''
|
||||||
|
self.ts_scan_range_: List[float] = [
|
||||||
|
float(UsedTsma.TS_MIN), float(UsedTsma.TS_MAX)]
|
||||||
|
self.agg_funcs_: List[str] = []
|
||||||
|
self.tsmas_: List[TSMA] = [] # currently created tsmas
|
||||||
|
self.opts_: TSMATesterSQLGeneratorOptions = opts
|
||||||
|
self.res_: TSMATesterSQLGeneratorRes = TSMATesterSQLGeneratorRes()
|
||||||
|
|
||||||
|
self.select_list_: List[str] = []
|
||||||
|
self.where_list_: List[str] = []
|
||||||
|
self.group_or_partition_by_list: List[str] = []
|
||||||
|
self.interval: str = ''
|
||||||
|
|
||||||
|
def get_depth_one_str_funcs(self, name: str) -> List[str]:
|
||||||
|
concat1 = f'CONCAT({name}, "_concat")'
|
||||||
|
concat2 = f'CONCAT({name}, {name})'
|
||||||
|
concat3 = f'CONCAT({name}, {name}, {name})'
|
||||||
|
start = random.randint(1, 3)
|
||||||
|
len = random.randint(0, 3)
|
||||||
|
substr = f'SUBSTR({name}, {start}, {len})'
|
||||||
|
lower = f'LOWER({name})'
|
||||||
|
ltrim = f'LTRIM({name})'
|
||||||
|
return [concat1, concat2, concat3, substr, substr, lower, lower, ltrim, name]
|
||||||
|
|
||||||
|
def generate_depthed_str_func(self, name: str, depth: int) -> str:
|
||||||
|
if depth == 1:
|
||||||
|
return random.choice(self.get_depth_one_str_funcs(name))
|
||||||
|
name = self.generate_depthed_str_func(name, depth - 1)
|
||||||
|
return random.choice(self.get_depth_one_str_funcs(name))
|
||||||
|
|
||||||
|
def generate_str_func(self, column_name: str, depth: int = 0) -> str:
|
||||||
|
if depth == 0:
|
||||||
|
depth = random.randint(1, 3)
|
||||||
|
|
||||||
|
ret = self.generate_depthed_str_func(column_name, depth)
|
||||||
|
tdLog.debug(f'generating str func: {ret}')
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def get_random_type(self, funcs):
|
||||||
|
rand: int = randrange(1, len(funcs))
|
||||||
|
return funcs[rand-1]()
|
||||||
|
|
||||||
|
def generate_select_list(self, user_select_list: str, partition_by_list: str):
|
||||||
|
res = user_select_list
|
||||||
|
if self.res_.has_interval and random.random() < 0.8:
|
||||||
|
res = res + ',_wstart, _wend'
|
||||||
|
if self.res_.partition_by or self.res_.group_by and random.random() < 0.8:
|
||||||
|
res = res + f',{partition_by_list}'
|
||||||
|
return res
|
||||||
|
|
||||||
|
def generate_order_by(self, user_order_by: str, partition_by_list: str):
|
||||||
|
auto_order_by = 'ORDER BY'
|
||||||
|
has_limit = self.res_.has_limit or self.res_.has_slimit
|
||||||
|
if has_limit and (self.res_.group_by or self.res_.partition_by):
|
||||||
|
auto_order_by = f'{auto_order_by} {partition_by_list},'
|
||||||
|
if has_limit and self.res_.has_interval:
|
||||||
|
auto_order_by = f'{auto_order_by} _wstart, _wend,'
|
||||||
|
if len(user_order_by) > 0:
|
||||||
|
self.res_.has_user_order_by = True
|
||||||
|
auto_order_by = f'{auto_order_by} {user_order_by},'
|
||||||
|
if auto_order_by == 'ORDER BY':
|
||||||
|
return ''
|
||||||
|
else:
|
||||||
|
return auto_order_by[:-1]
|
||||||
|
|
||||||
|
def generate_one(self, select_list: str, possible_tbs: List, order_by_list: str, interval_list: List[str] = []) -> str:
|
||||||
|
tb = random.choice(possible_tbs)
|
||||||
|
where = self.generate_where()
|
||||||
|
interval = self.generate_interval(interval_list)
|
||||||
|
(partition_by, partition_by_list) = self.generate_partition_by()
|
||||||
|
limit = self.generate_limit()
|
||||||
|
auto_select_list = self.generate_select_list(
|
||||||
|
select_list, partition_by_list)
|
||||||
|
order_by = self.generate_order_by(order_by_list, partition_by_list)
|
||||||
|
sql = f"SELECT {auto_select_list} FROM {tb} {where} {partition_by} {partition_by_list} {interval} {order_by} {limit}"
|
||||||
|
tdLog.debug(sql)
|
||||||
|
return sql
|
||||||
|
|
||||||
|
def can_ignore_res_order(self):
|
||||||
|
return self.res_.can_ignore_res_order()
|
||||||
|
|
||||||
|
def generate_where(self) -> str:
|
||||||
|
v = random.random()
|
||||||
|
where = ''
|
||||||
|
if not self.opts_.norm_tb:
|
||||||
|
if v < 0.2:
|
||||||
|
where = f'{self.generate_tbname_where()}'
|
||||||
|
elif v < 0.5:
|
||||||
|
where = f'{self.generate_tag_where()}'
|
||||||
|
elif v < 0.7:
|
||||||
|
op = random.choice(['AND', 'OR'])
|
||||||
|
where = f'{self.generate_tbname_where()} {op} {self.generate_tag_where()}'
|
||||||
|
ts_where = self.generate_ts_where_range()
|
||||||
|
if len(ts_where) > 0 or len(where) > 0:
|
||||||
|
op = ''
|
||||||
|
if len(where) > 0 and len(ts_where) > 0:
|
||||||
|
op = random.choice(['AND', 'AND', 'AND', 'AND', 'OR'])
|
||||||
|
return f'WHERE {ts_where} {op} {where}'
|
||||||
|
return ''
|
||||||
|
|
||||||
|
def generate_str_equal_operator(self, column_name: str, opts: List) -> str:
|
||||||
|
opt = random.choice(opts)
|
||||||
|
return f'{column_name} = "{opt}"'
|
||||||
|
|
||||||
|
# TODO support it
|
||||||
|
def generate_str_in_operator(self, column_name: str, opts: List) -> str:
|
||||||
|
opt = random.choice(opts)
|
||||||
|
IN = f'"{",".join(opts)}"'
|
||||||
|
return f'{column_name} in ({IN})'
|
||||||
|
|
||||||
|
def generate_str_like_operator(self, column_name: str, opts: List) -> str:
|
||||||
|
opt = random.choice(opts)
|
||||||
|
return f'{column_name} like "{opt}"'
|
||||||
|
|
||||||
|
def generate_tbname_where(self) -> str:
|
||||||
|
tbs = []
|
||||||
|
for idx in range(1, self.opts_.tag_num + 1):
|
||||||
|
tbs.append(f'{self.opts_.child_table_name_prefix}{idx}')
|
||||||
|
|
||||||
|
if random.random() < 0.5:
|
||||||
|
return self.generate_str_equal_operator('tbname', tbs)
|
||||||
|
else:
|
||||||
|
return self.generate_str_like_operator('tbname', ['t%', '%2'])
|
||||||
|
|
||||||
|
def generate_tag_where(self) -> str:
|
||||||
|
idx = random.randrange(1, self.opts_.tag_num + 1)
|
||||||
|
if random.random() < 0.5 and idx in self.opts_.str_tag_idx:
|
||||||
|
if random.random() < 0.5:
|
||||||
|
return self.generate_str_equal_operator(f'{self.opts_.tags_prefix}{idx}', [f'tb{random.randint(1,100)}'])
|
||||||
|
else:
|
||||||
|
return self.generate_str_like_operator(f'{self.opts_.tags_prefix}{idx}', ['%1', 'tb%', 'tb1%', '%1%'])
|
||||||
|
else:
|
||||||
|
operator = random.choice(['>', '>=', '<', '<=', '=', '!='])
|
||||||
|
val = random.randint(1, 100)
|
||||||
|
return f'{self.opts_.tags_prefix}{idx} {operator} {val}'
|
||||||
|
|
||||||
|
def generate_timestamp(self, min: float = -1, max: float = 0) -> int:
|
||||||
|
milliseconds_aligned: float = random.randint(int(min), int(max))
|
||||||
|
seconds_aligned = int(milliseconds_aligned / 1000) * 1000
|
||||||
|
if seconds_aligned < min:
|
||||||
|
seconds_aligned = int(min)
|
||||||
|
minutes_aligned = int(milliseconds_aligned / 1000 / 60) * 1000 * 60
|
||||||
|
if minutes_aligned < min:
|
||||||
|
minutes_aligned = int(min)
|
||||||
|
hour_aligned = int(milliseconds_aligned / 1000 /
|
||||||
|
60 / 60) * 1000 * 60 * 60
|
||||||
|
if hour_aligned < min:
|
||||||
|
hour_aligned = int(min)
|
||||||
|
|
||||||
|
return random.choice([milliseconds_aligned, seconds_aligned, seconds_aligned, minutes_aligned, minutes_aligned, hour_aligned, hour_aligned])
|
||||||
|
|
||||||
|
def generate_ts_where_range(self):
|
||||||
|
if not self.opts_.where_ts_range:
|
||||||
|
return ''
|
||||||
|
left_operators = ['>', '>=', '']
|
||||||
|
right_operators = ['<', '<=', '']
|
||||||
|
left_operator = left_operators[random.randrange(0, 3)]
|
||||||
|
right_operator = right_operators[random.randrange(0, 3)]
|
||||||
|
a = ''
|
||||||
|
left_value = None
|
||||||
|
if left_operator:
|
||||||
|
left_value = self.generate_timestamp(
|
||||||
|
self.opts_.ts_min, self.opts_.ts_max)
|
||||||
|
a += f'{self.opts_.pk_col} {left_operator} {left_value}'
|
||||||
|
if right_operator:
|
||||||
|
if left_value:
|
||||||
|
start = left_value
|
||||||
|
else:
|
||||||
|
start = self.opts_.ts_min
|
||||||
|
right_value = self.generate_timestamp(start, self.opts_.ts_max)
|
||||||
|
if left_operator:
|
||||||
|
a += ' AND '
|
||||||
|
a += f'{self.opts_.pk_col} {right_operator} {right_value}'
|
||||||
|
# tdLog.debug(f'{self.opts_.pk_col} range with: {a}')
|
||||||
|
if len(a) > 0:
|
||||||
|
self.res_.has_where_ts_range = True
|
||||||
|
return a
|
||||||
|
|
||||||
|
def generate_limit(self) -> str:
|
||||||
|
ret = ''
|
||||||
|
can_have_slimit = self.res_.partition_by or self.res_.group_by
|
||||||
|
if can_have_slimit:
|
||||||
|
if random.random() < 0.4:
|
||||||
|
ret = f'SLIMIT {random.randint(0, self.opts_.slimit_max)}'
|
||||||
|
self.res_.has_slimit = True
|
||||||
|
if random.random() < 0.4:
|
||||||
|
self.res_.has_limit = True
|
||||||
|
ret = ret + f' LIMIT {random.randint(0, self.opts_.limit_max)}'
|
||||||
|
return ret
|
||||||
|
|
||||||
|
## if offset is True, offset cannot be the same as interval
|
||||||
|
def generate_random_offset_sliding(self, interval: str, offset: bool = False) -> str:
|
||||||
|
unit = interval[-1]
|
||||||
|
hasUnit = unit.isalpha()
|
||||||
|
if not hasUnit:
|
||||||
|
start = 1
|
||||||
|
if offset:
|
||||||
|
start = 2
|
||||||
|
ret: int = int(int(interval) / random.randint(start, 5))
|
||||||
|
return str(ret)
|
||||||
|
return ''
|
||||||
|
|
||||||
|
# add sliding offset
|
||||||
|
def generate_interval(self, intervals: List[str]) -> str:
|
||||||
|
if not self.opts_.interval:
|
||||||
|
return ''
|
||||||
|
if random.random() < 0.4: # no interval
|
||||||
|
return ''
|
||||||
|
value = random.choice(intervals)
|
||||||
|
self.res_.has_interval = True
|
||||||
|
has_offset = False
|
||||||
|
offset = ''
|
||||||
|
has_sliding = False
|
||||||
|
sliding = ''
|
||||||
|
num: int = int(value[:-1])
|
||||||
|
unit = value[-1]
|
||||||
|
if has_offset and num > 1:
|
||||||
|
offset = f', {self.generate_random_offset_sliding(value, True)}'
|
||||||
|
if has_sliding:
|
||||||
|
sliding = f'sliding({self.generate_random_offset_sliding(value)})'
|
||||||
|
return f'INTERVAL({value} {offset}) {sliding}'
|
||||||
|
|
||||||
|
def generate_tag_list(self):
|
||||||
|
used_tag_num = random.randrange(1, self.opts_.tag_num + 1)
|
||||||
|
ret = ''
|
||||||
|
for _ in range(used_tag_num):
|
||||||
|
tag_idx = random.randint(1, self.opts_.tag_num)
|
||||||
|
tag_name = self.opts_.tags_prefix + f'{tag_idx}'
|
||||||
|
if random.random() < 0.5 and tag_idx in self.opts_.str_tag_idx:
|
||||||
|
tag_func = self.generate_str_func(tag_name, 2)
|
||||||
|
else:
|
||||||
|
tag_func = tag_name
|
||||||
|
ret = ret + f'{tag_func},'
|
||||||
|
return ret[:-1]
|
||||||
|
|
||||||
|
def generate_tbname_tag_list(self):
|
||||||
|
tag_num = random.randrange(1, self.opts_.tag_num)
|
||||||
|
ret = ''
|
||||||
|
tbname_idx = random.randint(0, tag_num + 1)
|
||||||
|
for i in range(tag_num + 1):
|
||||||
|
if i == tbname_idx:
|
||||||
|
ret = ret + 'tbname,'
|
||||||
|
else:
|
||||||
|
tag_idx = random.randint(1, self.opts_.tag_num)
|
||||||
|
ret = ret + self.opts_.tags_prefix + f'{tag_idx},'
|
||||||
|
return ret[:-1]
|
||||||
|
|
||||||
|
def generate_partition_by(self):
|
||||||
|
if not self.opts_.partition_by and not self.opts_.group_by:
|
||||||
|
return ('', '')
|
||||||
|
# no partition or group
|
||||||
|
if random.random() < 0.3:
|
||||||
|
return ('', '')
|
||||||
|
ret = ''
|
||||||
|
rand = random.random()
|
||||||
|
if rand < 0.4:
|
||||||
|
if random.random() < 0.5:
|
||||||
|
ret = self.generate_str_func('tbname', 3)
|
||||||
|
else:
|
||||||
|
ret = 'tbname'
|
||||||
|
elif rand < 0.8:
|
||||||
|
ret = self.generate_tag_list()
|
||||||
|
else:
|
||||||
|
# tbname and tag
|
||||||
|
ret = self.generate_tbname_tag_list()
|
||||||
|
# tdLog.debug(f'partition by: {ret}')
|
||||||
|
if self.res_.has_interval or random.random() < 0.5:
|
||||||
|
self.res_.partition_by = True
|
||||||
|
return (str('PARTITION BY'), f'{ret}')
|
||||||
|
else:
|
||||||
|
self.res_.group_by = True
|
||||||
|
return (str('GROUP BY'), f'{ret}')
|
||||||
|
|
||||||
|
def generate_where_tbname(self) -> str:
|
||||||
|
return self.generate_str_func('tbname')
|
||||||
|
|
||||||
|
def generate_where_tag(self) -> str:
|
||||||
|
# tag_idx = random.randint(1, self.opts_.tag_num)
|
||||||
|
# tag = self.opts_.tags_prefix + str(tag_idx)
|
||||||
|
return self.generate_str_func('t3')
|
||||||
|
|
||||||
|
def generate_where_conditions(self) -> str:
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
# generate func in tsmas(select list)
|
||||||
|
def _generate_agg_func_for_select(self) -> str:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# order by, limit, having, subquery...
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
updatecfgDict = {'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 4, 'maxTsmaNum': 3}
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.vgroups = 4
|
||||||
|
self.ctbNum = 10
|
||||||
|
self.rowsPerTbl = 10000
|
||||||
|
self.duraion = '1h'
|
||||||
|
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor(), False)
|
||||||
|
self.tsma_tester: TSMATester = TSMATester(tdSql)
|
||||||
|
self.tsma_sql_generator: TSMATestSQLGenerator = TSMATestSQLGenerator()
|
||||||
|
|
||||||
|
def create_database(self, tsql, dbName, dropFlag=1, vgroups=2, replica=1, duration: str = '1d'):
|
||||||
|
if dropFlag == 1:
|
||||||
|
tsql.execute("drop database if exists %s" % (dbName))
|
||||||
|
|
||||||
|
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s" % (
|
||||||
|
dbName, vgroups, replica, duration))
|
||||||
|
tdLog.debug("complete to create database %s" % (dbName))
|
||||||
|
return
|
||||||
|
|
||||||
|
def create_stable(self, tsql, paraDict):
|
||||||
|
colString = tdCom.gen_column_type_str(
|
||||||
|
colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"])
|
||||||
|
tagString = tdCom.gen_tag_type_str(
|
||||||
|
tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"])
|
||||||
|
sqlString = f"create table if not exists %s.%s (%s) tags (%s)" % (
|
||||||
|
paraDict["dbName"], paraDict["stbName"], colString, tagString)
|
||||||
|
tdLog.debug("%s" % (sqlString))
|
||||||
|
tsql.execute(sqlString)
|
||||||
|
return
|
||||||
|
|
||||||
|
def create_ctable(self, tsql=None, dbName='dbx', stbName='stb', ctbPrefix='ctb', ctbNum=1, ctbStartIdx=0):
|
||||||
|
for i in range(ctbNum):
|
||||||
|
sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % (dbName, ctbPrefix, i+ctbStartIdx, dbName, stbName, (i+ctbStartIdx) % 5, i+ctbStartIdx + random.randint(
|
||||||
|
1, 100), i+ctbStartIdx + random.randint(1, 100), i+ctbStartIdx + random.randint(1, 100), i+ctbStartIdx + random.randint(1, 100), i+ctbStartIdx + random.randint(1, 100))
|
||||||
|
tsql.execute(sqlString)
|
||||||
|
|
||||||
|
tdLog.debug("complete to create %d child tables by %s.%s" %
|
||||||
|
(ctbNum, dbName, stbName))
|
||||||
|
return
|
||||||
|
|
||||||
|
def init_normal_tb(self, tsql, db_name: str, tb_name: str, rows: int, start_ts: int, ts_step: int):
|
||||||
|
sql = 'CREATE TABLE %s.%s (ts timestamp, c1 INT, c2 INT, c3 INT, c4 double, c5 VARCHAR(255))' % (
|
||||||
|
db_name, tb_name)
|
||||||
|
tsql.execute(sql)
|
||||||
|
sql = 'INSERT INTO %s.%s values' % (db_name, tb_name)
|
||||||
|
for j in range(rows):
|
||||||
|
sql += f'(%d, %d,%d,%d,{random.random()},"varchar_%d"),' % (start_ts + j * ts_step + randrange(500), j %
|
||||||
|
10 + randrange(200), j % 10, j % 10, j % 10 + randrange(100))
|
||||||
|
tsql.execute(sql)
|
||||||
|
|
||||||
|
def insert_data(self, tsql, dbName, ctbPrefix, ctbNum, rowsPerTbl, batchNum, startTs, tsStep):
|
||||||
|
tdLog.debug("start to insert data ............")
|
||||||
|
tsql.execute("use %s" % dbName)
|
||||||
|
pre_insert = "insert into "
|
||||||
|
sql = pre_insert
|
||||||
|
|
||||||
|
for i in range(ctbNum):
|
||||||
|
rowsBatched = 0
|
||||||
|
sql += " %s.%s%d values " % (dbName, ctbPrefix, i)
|
||||||
|
for j in range(rowsPerTbl):
|
||||||
|
if (i < ctbNum/2):
|
||||||
|
sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') " % (startTs + j*tsStep + randrange(
|
||||||
|
500), j % 10 + randrange(100), j % 10 + randrange(200), j % 10, j % 10, j % 10, j % 10, j % 10, j % 10)
|
||||||
|
else:
|
||||||
|
sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') " % (
|
||||||
|
startTs + j*tsStep + randrange(500), j % 10, j % 10, j % 10, j % 10, j % 10, j % 10)
|
||||||
|
rowsBatched += 1
|
||||||
|
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
|
||||||
|
tsql.execute(sql)
|
||||||
|
rowsBatched = 0
|
||||||
|
if j < rowsPerTbl - 1:
|
||||||
|
sql = "insert into %s.%s%d values " % (dbName, ctbPrefix, i)
|
||||||
|
else:
|
||||||
|
sql = "insert into "
|
||||||
|
if sql != pre_insert:
|
||||||
|
tsql.execute(sql)
|
||||||
|
tdLog.debug("insert data ............ [OK]")
|
||||||
|
return
|
||||||
|
|
||||||
|
def init_data(self, db: str = 'test', ctb_num: int = 10, rows_per_ctb: int = 10000, start_ts: int = 1537146000000, ts_step: int = 500):
|
||||||
|
tdLog.printNoPrefix(
|
||||||
|
"======== prepare test env include database, stable, ctables, and insert data: ")
|
||||||
|
paraDict = {'dbName': db,
|
||||||
|
'dropFlag': 1,
|
||||||
|
'vgroups': 2,
|
||||||
|
'stbName': 'meters',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count': 1}, {'type': 'BIGINT', 'count': 1}, {'type': 'FLOAT', 'count': 1}, {'type': 'DOUBLE', 'count': 1}, {'type': 'smallint', 'count': 1}, {'type': 'tinyint', 'count': 1}, {'type': 'bool', 'count': 1}, {'type': 'binary', 'len': 10, 'count': 1}, {'type': 'nchar', 'len': 10, 'count': 1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count': 1}, {'type': 'nchar', 'len': 20, 'count': 1}, {'type': 'binary', 'len': 20, 'count': 1}, {'type': 'BIGINT', 'count': 1}, {'type': 'smallint', 'count': 1}, {'type': 'DOUBLE', 'count': 1}],
|
||||||
|
'ctbPrefix': 't',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': ctb_num,
|
||||||
|
'rowsPerTbl': rows_per_ctb,
|
||||||
|
'batchNum': 3000,
|
||||||
|
'startTs': start_ts,
|
||||||
|
'tsStep': ts_step}
|
||||||
|
|
||||||
|
paraDict['vgroups'] = self.vgroups
|
||||||
|
paraDict['ctbNum'] = ctb_num
|
||||||
|
paraDict['rowsPerTbl'] = rows_per_ctb
|
||||||
|
|
||||||
|
tdLog.info("create database")
|
||||||
|
self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"],
|
||||||
|
vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion)
|
||||||
|
|
||||||
|
tdLog.info("create stb")
|
||||||
|
self.create_stable(tsql=tdSql, paraDict=paraDict)
|
||||||
|
|
||||||
|
tdLog.info("create child tables")
|
||||||
|
self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"],
|
||||||
|
stbName=paraDict["stbName"], ctbPrefix=paraDict["ctbPrefix"],
|
||||||
|
ctbNum=paraDict["ctbNum"], ctbStartIdx=paraDict["ctbStartIdx"])
|
||||||
|
self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],
|
||||||
|
ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],
|
||||||
|
rowsPerTbl=paraDict["rowsPerTbl"], batchNum=paraDict["batchNum"],
|
||||||
|
startTs=paraDict["startTs"], tsStep=paraDict["tsStep"])
|
||||||
|
self.init_normal_tb(tdSql, paraDict['dbName'], 'norm_tb',
|
||||||
|
paraDict['rowsPerTbl'], paraDict['startTs'], paraDict['tsStep'])
|
||||||
|
|
||||||
|
def wait_for_tsma_calculation(self, func_list: list, db: str, tb: str, interval: str, tsma_name: str, timeout_seconds: int =600):
|
||||||
|
start_time = time.time()
|
||||||
|
while True:
|
||||||
|
current_time = time.time()
|
||||||
|
if current_time - start_time > timeout_seconds:
|
||||||
|
error_message = f"Timeout occurred while waiting for TSMA calculation to complete."
|
||||||
|
tdLog.exit(error_message)
|
||||||
|
sql = 'select %s from %s.%s interval(%s)' % (
|
||||||
|
', '.join(func_list), db, tb, interval)
|
||||||
|
tdLog.debug(
|
||||||
|
f'waiting for tsma {db}.{tsma_name} to be useful with sql {sql}')
|
||||||
|
ctx: TSMAQueryContext = self.tsma_tester.get_tsma_query_ctx(sql)
|
||||||
|
if ctx.has_tsma():
|
||||||
|
if ctx.used_tsmas[0].name == tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX:
|
||||||
|
break
|
||||||
|
elif len(ctx.used_tsmas[0].name) == 32:
|
||||||
|
name = f'1.{db}.{tsma_name}_{tb}'
|
||||||
|
if ctx.used_tsmas[0].name == TSMAQCBuilder().md5(name):
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
time.sleep(1)
|
||||||
|
else:
|
||||||
|
time.sleep(1)
|
||||||
|
else:
|
||||||
|
time.sleep(1)
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
def create_tsma(self, tsma_name: str, db: str, tb: str, func_list: list, interval: str, check_tsma_calculation : str=True):
|
||||||
|
tdSql.execute('use %s' % db)
|
||||||
|
sql = "CREATE TSMA %s ON %s.%s FUNCTION(%s) INTERVAL(%s)" % (
|
||||||
|
tsma_name, db, tb, ','.join(func_list), interval)
|
||||||
|
tdSql.execute(sql, queryTimes=1)
|
||||||
|
if check_tsma_calculation == True:
|
||||||
|
self.wait_for_tsma_calculation(func_list, db, tb, interval, tsma_name)
|
||||||
|
|
||||||
|
def create_error_tsma(self, tsma_name: str, db: str, tb: str, func_list: list, interval: str, expectedErrno: int):
|
||||||
|
tdSql.execute('use %s' % db)
|
||||||
|
sql = "CREATE TSMA %s ON %s.%s FUNCTION(%s) INTERVAL(%s)" % (
|
||||||
|
tsma_name, db, tb, ','.join(func_list), interval)
|
||||||
|
tdSql.error(sql, expectedErrno)
|
||||||
|
|
||||||
|
def create_recursive_tsma(self, base_tsma_name: str, new_tsma_name: str, db: str, interval: str, tb_name: str, func_list: List[str] = ['avg(c1)']):
|
||||||
|
tdSql.execute('use %s' % db, queryTimes=1)
|
||||||
|
sql = 'CREATE RECURSIVE TSMA %s ON %s.%s INTERVAL(%s)' % (
|
||||||
|
new_tsma_name, db, base_tsma_name, interval)
|
||||||
|
tdSql.execute(sql, queryTimes=1)
|
||||||
|
self.wait_for_tsma_calculation(
|
||||||
|
func_list, db, tb_name, interval, new_tsma_name)
|
||||||
|
|
||||||
|
def drop_tsma(self, tsma_name: str, db: str):
|
||||||
|
sql = 'DROP TSMA %s.%s' % (db, tsma_name)
|
||||||
|
tdSql.execute(sql, queryTimes=1)
|
||||||
|
|
||||||
|
def check_explain_res_has_row(self, plan_str_expect: str, explain_output):
|
||||||
|
plan_found = False
|
||||||
|
for row in explain_output:
|
||||||
|
if str(row).find(plan_str_expect) >= 0:
|
||||||
|
tdLog.debug("plan: [%s] found in: [%s]" %
|
||||||
|
(plan_str_expect, str(row)))
|
||||||
|
plan_found = True
|
||||||
|
break
|
||||||
|
if not plan_found:
|
||||||
|
tdLog.exit("plan: %s not found in res: [%s]" % (
|
||||||
|
plan_str_expect, str(explain_output)))
|
||||||
|
|
||||||
|
def check(self, ctxs: List):
|
||||||
|
for ctx in ctxs:
|
||||||
|
self.tsma_tester.check_sql(ctx.sql, ctx)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.test_bigger_tsma_interval()
|
||||||
|
|
||||||
|
def test_create_recursive_tsma_interval(self, db: str, tb: str, func, interval: str, recursive_interval: str, succ: bool, code: int):
|
||||||
|
self.create_tsma('tsma1', db, tb, func, interval)
|
||||||
|
sql = f'CREATE RECURSIVE TSMA tsma2 ON {db}.tsma1 INTERVAL({recursive_interval})'
|
||||||
|
if not succ:
|
||||||
|
tdSql.error(sql, code)
|
||||||
|
else:
|
||||||
|
self.create_recursive_tsma('tsma1', 'tsma2', db, recursive_interval, tb, func)
|
||||||
|
self.drop_tsma('tsma2', db)
|
||||||
|
self.drop_tsma('tsma1', db)
|
||||||
|
|
||||||
|
def test_bigger_tsma_interval_query(self, func_list: List):
|
||||||
|
## 3 tsmas, 12h, 1n, 1y
|
||||||
|
ctxs = []
|
||||||
|
interval_list = ['2h', '8h', '1d', '1n', '3n', '1w', '1y', '2y']
|
||||||
|
opts: TSMATesterSQLGeneratorOptions = TSMATesterSQLGeneratorOptions()
|
||||||
|
opts.interval = True
|
||||||
|
opts.where_ts_range = True
|
||||||
|
for _ in range(1, ROUND):
|
||||||
|
opts.partition_by = True
|
||||||
|
opts.group_by = True
|
||||||
|
opts.norm_tb = False
|
||||||
|
sql_generator = TSMATestSQLGenerator(opts)
|
||||||
|
sql = sql_generator.generate_one(
|
||||||
|
','.join(func_list), ['db.meters', 'db.meters', 'db.t1', 'db.t9'], '', interval_list)
|
||||||
|
ctxs.append(TSMAQCBuilder().with_sql(sql).ignore_query_table(
|
||||||
|
).ignore_res_order(sql_generator.can_ignore_res_order()).get_qc())
|
||||||
|
return ctxs
|
||||||
|
|
||||||
|
def test_bigger_tsma_interval(self):
|
||||||
|
db = 'db'
|
||||||
|
tb = 'meters'
|
||||||
|
func = ['max(c1)', 'min(c1)', 'min(c2)', 'max(c2)', 'avg(c1)', 'count(ts)']
|
||||||
|
self.init_data(db,10, 10000, 1500000000000, 11000000)
|
||||||
|
examples = [
|
||||||
|
('10m', '1h', True), ('10m','1d',True), ('1m', '120s', True), ('1h','1d',True),
|
||||||
|
('12h', '1y', False), ('1h', '1n', True), ('1h', '1y', True),
|
||||||
|
('12n', '1y', False), ('2d','1n',False), ('55m', '55h', False), ('7m','7d',False),
|
||||||
|
]
|
||||||
|
tdSql.execute('use db')
|
||||||
|
for (i, ri, ret) in examples:
|
||||||
|
self.test_create_recursive_tsma_interval(db, tb, func, i, ri, ret, -2147471099)
|
||||||
|
|
||||||
|
self.create_tsma('tsma1', db, tb, func, '1h')
|
||||||
|
self.create_recursive_tsma('tsma1', 'tsma2', db, '1n', tb, func)
|
||||||
|
self.create_recursive_tsma('tsma2', 'tsma3', db, '1y', tb, func)
|
||||||
|
self.check(self.test_bigger_tsma_interval_query(func))
|
||||||
|
|
||||||
|
ctxs = []
|
||||||
|
ctxs.append(TSMAQCBuilder().with_sql('SELECT max(c1) FROM db.meters').should_query_with_tsma('tsma3').get_qc())
|
||||||
|
ctxs.append(TSMAQCBuilder()
|
||||||
|
.with_sql('SELECT max(c1) FROM db.meters WHERE ts > "2024-09-03 18:40:00.324"')
|
||||||
|
.should_query_with_table('meters', '2024-09-03 18:40:00.325', '2024-12-31 23:59:59.999')
|
||||||
|
.should_query_with_tsma('tsma3', '2025-01-01 00:00:00.000', UsedTsma.TS_MAX)
|
||||||
|
.get_qc())
|
||||||
|
|
||||||
|
ctxs.append(TSMAQCBuilder()
|
||||||
|
.with_sql('SELECT max(c1) FROM db.meters WHERE ts >= "2024-09-03 18:00:00.000"')
|
||||||
|
.should_query_with_tsma('tsma1', '2024-09-03 18:00:00.000', '2024-12-31 23:59:59.999')
|
||||||
|
.should_query_with_tsma('tsma3', '2025-01-01 00:00:00.000', UsedTsma.TS_MAX)
|
||||||
|
.get_qc())
|
||||||
|
|
||||||
|
ctxs.append(TSMAQCBuilder()
|
||||||
|
.with_sql('SELECT max(c1) FROM db.meters WHERE ts >= "2024-09-01 00:00:00.000"')
|
||||||
|
.should_query_with_tsma('tsma2', '2024-09-01 00:00:00.000', '2024-12-31 23:59:59.999')
|
||||||
|
.should_query_with_tsma('tsma3', '2025-01-01 00:00:00.000', UsedTsma.TS_MAX)
|
||||||
|
.get_qc())
|
||||||
|
|
||||||
|
ctxs.append(TSMAQCBuilder()
|
||||||
|
.with_sql("SELECT max(c1) FROM db.meters INTERVAL(12n)")
|
||||||
|
.should_query_with_tsma('tsma3')
|
||||||
|
.get_qc())
|
||||||
|
|
||||||
|
ctxs.append(TSMAQCBuilder()
|
||||||
|
.with_sql("SELECT max(c1) FROM db.meters INTERVAL(13n)")
|
||||||
|
.should_query_with_tsma('tsma2')
|
||||||
|
.get_qc())
|
||||||
|
|
||||||
|
ctxs.append(TSMAQCBuilder()
|
||||||
|
.with_sql("SELECT max(c1),min(c1),min(c2),max(c2),avg(c1),count(ts) FROM db.t9 WHERE ts > '2018-09-17 08:16:00'")
|
||||||
|
.should_query_with_table('t9', '2018-09-17 08:16:00.001', '2018-12-31 23:59:59:999')
|
||||||
|
.should_query_with_tsma_ctb('db', 'tsma3', 't9', '2019-01-01')
|
||||||
|
.get_qc())
|
||||||
|
|
||||||
|
ctxs.append(TSMAQCBuilder()
|
||||||
|
.with_sql("SELECT max(c1), _wstart FROM db.meters WHERE ts >= '2024-09-03 18:40:00.324' INTERVAL(1d)")
|
||||||
|
.should_query_with_table('meters', '2024-09-03 18:40:00.324', '2024-09-03 23:59:59:999')
|
||||||
|
.should_query_with_tsma('tsma1', '2024-09-04 00:00:00.000')
|
||||||
|
.get_qc())
|
||||||
|
|
||||||
|
ctxs.append(TSMAQCBuilder()
|
||||||
|
.with_sql("SELECT max(c1), _wstart FROM db.meters WHERE ts >= '2024-09-03 18:40:00.324' INTERVAL(1n)")
|
||||||
|
.should_query_with_table('meters', '2024-09-03 18:40:00.324', '2024-09-30 23:59:59:999')
|
||||||
|
.should_query_with_tsma('tsma2', '2024-10-01 00:00:00.000')
|
||||||
|
.get_qc())
|
||||||
|
|
||||||
|
self.check(ctxs)
|
||||||
|
tdSql.execute('drop database db')
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
|
||||||
|
event = threading.Event()
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue