support bigger tsma interval
This commit is contained in:
parent
932e6e2855
commit
dbc2e9ec75
|
@ -640,6 +640,7 @@ typedef struct SCreateTSMAStmt {
|
||||||
STSMAOptions* pOptions;
|
STSMAOptions* pOptions;
|
||||||
SNode* pPrevQuery;
|
SNode* pPrevQuery;
|
||||||
SMCreateSmaReq* pReq;
|
SMCreateSmaReq* pReq;
|
||||||
|
uint8_t precision;
|
||||||
} SCreateTSMAStmt;
|
} SCreateTSMAStmt;
|
||||||
|
|
||||||
typedef struct SDropTSMAStmt {
|
typedef struct SDropTSMAStmt {
|
||||||
|
|
|
@ -2004,8 +2004,13 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
|
||||||
|
|
||||||
// interval
|
// interval
|
||||||
char interval[64 + VARSTR_HEADER_SIZE] = {0};
|
char interval[64 + VARSTR_HEADER_SIZE] = {0};
|
||||||
int32_t len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
|
int32_t len = 0;
|
||||||
|
if (!IS_CALENDAR_TIME_DURATION(pSma->intervalUnit)) {
|
||||||
|
len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
|
||||||
getPrecisionUnit(pSrcDb->cfg.precision));
|
getPrecisionUnit(pSrcDb->cfg.precision));
|
||||||
|
} else {
|
||||||
|
len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, pSma->intervalUnit);
|
||||||
|
}
|
||||||
varDataSetLen(interval, len);
|
varDataSetLen(interval, len);
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, interval, false);
|
colDataSetVal(pColInfo, numOfRows, interval, false);
|
||||||
|
|
|
@ -883,6 +883,8 @@ static uint8_t getPrecisionFromCurrStmt(SNode* pCurrStmt, uint8_t defaultVal) {
|
||||||
if (isDeleteStmt(pCurrStmt)) {
|
if (isDeleteStmt(pCurrStmt)) {
|
||||||
return ((SDeleteStmt*)pCurrStmt)->precision;
|
return ((SDeleteStmt*)pCurrStmt)->precision;
|
||||||
}
|
}
|
||||||
|
if (pCurrStmt && nodeType(pCurrStmt) == QUERY_NODE_CREATE_TSMA_STMT)
|
||||||
|
return ((SCreateTSMAStmt*)pCurrStmt)->precision;
|
||||||
return defaultVal;
|
return defaultVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11123,24 +11125,105 @@ static int32_t rewriteTSMAFuncs(STranslateContext* pCxt, SCreateTSMAStmt* pStmt,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int8_t UNIT_INDEX[26] = {/*a*/ 2, 0, -1, 6, -1, -1, -1,
|
||||||
|
/*h*/ 5, -1, -1, -1, -1, 4, 8,
|
||||||
|
/*o*/ -1, -1, -1, -1, 3, -1,
|
||||||
|
/*u*/ 1, -1, 7, -1, 9, -1};
|
||||||
|
static int64_t MATRIX[10][11] = { /* ns, us, ms, s, min, h, d, w, month, y*/
|
||||||
|
/*ns*/ { 1, 1000, 0},
|
||||||
|
/*us*/ {1000, 1, 1000, 0},
|
||||||
|
/*ms*/ { -1, 1000, 1, 1000, 0},
|
||||||
|
/*s*/ { -1, -1, 1000, 1, 60, 0},
|
||||||
|
/*min*/ { -1, -1, -1, 60, 1, 60, 0},
|
||||||
|
/*h*/ { -1, -1, -1, -1, 60, 1, 24, 0},
|
||||||
|
/*d*/ { -1, -1, -1, -1, -1, 24, 1, 7, 1, 0},
|
||||||
|
/*w*/ { -1, -1, -1, -1, -1, -1, 7, 1, -1, 0},
|
||||||
|
/*mon*/ { -1, -1, -1, -1, -1, -1, -1, -1, 1, 12, 0},
|
||||||
|
/*y*/ { -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 0}};
|
||||||
|
|
||||||
|
static bool recursiveTsmaCheckRecursive(int64_t baseInterval, int8_t baseIdx, int64_t interval, int8_t idx, int8_t precision) {
|
||||||
|
if (MATRIX[baseIdx][idx] == -1) return false;
|
||||||
|
if (baseIdx == idx) {
|
||||||
|
if (interval < baseInterval) return false;
|
||||||
|
return interval % baseInterval == 0;
|
||||||
|
}
|
||||||
|
int8_t next = baseIdx + 1;
|
||||||
|
while (MATRIX[baseIdx][next] != 0 && next <= idx) {
|
||||||
|
if (MATRIX[baseIdx][next] == -1) {
|
||||||
|
next++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (MATRIX[baseIdx][next] % baseInterval == 0) {
|
||||||
|
int8_t extra = baseInterval >= MATRIX[baseIdx][idx] ? 0 : 1;
|
||||||
|
if (!recursiveTsmaCheckRecursive(baseInterval / MATRIX[baseIdx][idx] + extra, next, interval, idx, precision)) {
|
||||||
|
next++;
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @breif check if tsma with param [interval], [unit] can create based on base tsma with baseInterval and baseUnit
|
||||||
|
* @param baseInterval, baseUnit, interval/unit of base tsma
|
||||||
|
* @param interval the tsma interval going to create. Not that if unit is not calander unit, then interval has already been
|
||||||
|
* translated to TICKS of [precision]
|
||||||
|
* @param unit the tsma unit going to create
|
||||||
|
* @precision the precision of this db
|
||||||
|
* @ret true the tsma can be created, else cannot
|
||||||
|
* */
|
||||||
|
static bool checkRecursiveTsmaInterval(int64_t baseInterval, int8_t baseUnit, int64_t interval, int8_t unit, int8_t precision) {
|
||||||
|
int8_t baseIdx = UNIT_INDEX[baseUnit], idx = UNIT_INDEX[unit];
|
||||||
|
if (baseIdx <= idx) {
|
||||||
|
return recursiveTsmaCheckRecursive(baseInterval, baseIdx, interval, idx, precision);
|
||||||
|
} else {
|
||||||
|
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq,
|
static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq,
|
||||||
SName* useTbName) {
|
SName* useTbName) {
|
||||||
SName name;
|
SName name;
|
||||||
|
SDbCfgInfo pDbInfo = {0};
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), pReq->name);
|
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), pReq->name);
|
||||||
memset(&name, 0, sizeof(SName));
|
memset(&name, 0, sizeof(SName));
|
||||||
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, useTbName);
|
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, useTbName);
|
||||||
tNameExtractFullName(useTbName, pReq->stb);
|
tNameExtractFullName(useTbName, pReq->stb);
|
||||||
pReq->igExists = pStmt->ignoreExists;
|
pReq->igExists = pStmt->ignoreExists;
|
||||||
|
|
||||||
|
code = getDBCfg(pCxt, pStmt->dbName, &pDbInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
pStmt->precision = pDbInfo.precision;
|
||||||
|
code = translateValue(pCxt, (SValueNode*)pStmt->pOptions->pInterval);
|
||||||
|
if (code == DEAL_RES_ERROR) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
|
pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
|
||||||
pReq->intervalUnit = TIME_UNIT_MILLISECOND;
|
pReq->intervalUnit = ((SValueNode*)pStmt->pOptions->pInterval)->unit;
|
||||||
|
|
||||||
#define TSMA_MIN_INTERVAL_MS 1000 * 60 // 1m
|
#define TSMA_MIN_INTERVAL_MS 1000 * 60 // 1m
|
||||||
#define TSMA_MAX_INTERVAL_MS (60 * 60 * 1000) // 1h
|
#define TSMA_MAX_INTERVAL_MS (60UL * 60UL * 1000UL * 24UL * 365UL) // 1y
|
||||||
if (pReq->interval > TSMA_MAX_INTERVAL_MS || pReq->interval < TSMA_MIN_INTERVAL_MS) {
|
|
||||||
|
if (!IS_CALENDAR_TIME_DURATION(pReq->intervalUnit)) {
|
||||||
|
int64_t factor = TSDB_TICK_PER_SECOND(pDbInfo.precision) / TSDB_TICK_PER_SECOND(TSDB_TIME_PRECISION_MILLI);
|
||||||
|
if (pReq->interval > TSMA_MAX_INTERVAL_MS * factor || pReq->interval < TSMA_MIN_INTERVAL_MS * factor) {
|
||||||
|
return TSDB_CODE_TSMA_INVALID_INTERVAL;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pReq->intervalUnit == TIME_UNIT_MONTH && (pReq->interval < 1 || pReq->interval > 12))
|
||||||
|
return TSDB_CODE_TSMA_INVALID_INTERVAL;
|
||||||
|
if (pReq->intervalUnit == TIME_UNIT_YEAR && (pReq->interval != 1))
|
||||||
return TSDB_CODE_TSMA_INVALID_INTERVAL;
|
return TSDB_CODE_TSMA_INVALID_INTERVAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
|
|
||||||
STableMeta* pTableMeta = NULL;
|
STableMeta* pTableMeta = NULL;
|
||||||
STableTSMAInfo* pRecursiveTsma = NULL;
|
STableTSMAInfo* pRecursiveTsma = NULL;
|
||||||
|
@ -11224,7 +11307,8 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateCreateTSMA(STranslateContext* pCxt, SCreateTSMAStmt* pStmt) {
|
static int32_t translateCreateTSMA(STranslateContext* pCxt, SCreateTSMAStmt* pStmt) {
|
||||||
int32_t code = doTranslateValue(pCxt, (SValueNode*)pStmt->pOptions->pInterval);
|
pCxt->pCurrStmt = (SNode*)pStmt;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SName useTbName = {0};
|
SName useTbName = {0};
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -6018,6 +6018,7 @@ typedef struct STSMAOptUsefulTsma {
|
||||||
SArray* pTsmaScanCols; // SArray<int32_t> index of tsmaFuncs array
|
SArray* pTsmaScanCols; // SArray<int32_t> index of tsmaFuncs array
|
||||||
char targetTbName[TSDB_TABLE_NAME_LEN]; // the scanning table name, used only when pTsma is not NULL
|
char targetTbName[TSDB_TABLE_NAME_LEN]; // the scanning table name, used only when pTsma is not NULL
|
||||||
uint64_t targetTbUid; // the scanning table uid, used only when pTsma is not NULL
|
uint64_t targetTbUid; // the scanning table uid, used only when pTsma is not NULL
|
||||||
|
int8_t precision;
|
||||||
} STSMAOptUsefulTsma;
|
} STSMAOptUsefulTsma;
|
||||||
|
|
||||||
typedef struct STSMAOptCtx {
|
typedef struct STSMAOptCtx {
|
||||||
|
@ -6085,7 +6086,7 @@ static void clearTSMAOptCtx(STSMAOptCtx* pTsmaOptCtx) {
|
||||||
taosMemoryFreeClear(pTsmaOptCtx->queryInterval);
|
taosMemoryFreeClear(pTsmaOptCtx->queryInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool tsmaOptCheckValidInterval(int64_t tsmaInterval, int8_t tsmaIntevalUnit, const STSMAOptCtx* pTsmaOptCtx) {
|
static bool tsmaOptCheckValidInterval(int64_t tsmaInterval, const STSMAOptCtx* pTsmaOptCtx) {
|
||||||
if (!pTsmaOptCtx->queryInterval) return true;
|
if (!pTsmaOptCtx->queryInterval) return true;
|
||||||
|
|
||||||
bool validInterval = pTsmaOptCtx->queryInterval->interval % tsmaInterval == 0;
|
bool validInterval = pTsmaOptCtx->queryInterval->interval % tsmaInterval == 0;
|
||||||
|
@ -6171,7 +6172,8 @@ static bool tsmaOptCheckTags(STSMAOptCtx* pCtx, const STableTSMAInfo* pTsma) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
|
static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
|
||||||
STSMAOptUsefulTsma usefulTsma = {.pTsma = NULL, .scanRange = {.skey = TSKEY_MIN, .ekey = TSKEY_MAX}};
|
STSMAOptUsefulTsma usefulTsma = {
|
||||||
|
.pTsma = NULL, .scanRange = {.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, .precision = pTsmaOptCtx->precision};
|
||||||
SArray* pTsmaScanCols = NULL;
|
SArray* pTsmaScanCols = NULL;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pTsmaOptCtx->pTsmas->size; ++i) {
|
for (int32_t i = 0; i < pTsmaOptCtx->pTsmas->size; ++i) {
|
||||||
|
@ -6189,7 +6191,7 @@ static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// filter with interval
|
// filter with interval
|
||||||
if (!tsmaOptCheckValidInterval(pTsma->interval, pTsma->unit, pTsmaOptCtx)) {
|
if (!tsmaOptCheckValidInterval(pTsma->interval, pTsmaOptCtx)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// filter with funcs, note that tsma funcs has been sorted by funcId and ColId
|
// filter with funcs, note that tsma funcs has been sorted by funcId and ColId
|
||||||
|
@ -6208,12 +6210,21 @@ static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsmaInfoCompWithIntervalDesc(const void* pLeft, const void* pRight) {
|
static int32_t tsmaInfoCompWithIntervalDesc(const void* pLeft, const void* pRight) {
|
||||||
|
const int64_t factors[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
|
||||||
const STSMAOptUsefulTsma *p = pLeft, *q = pRight;
|
const STSMAOptUsefulTsma *p = pLeft, *q = pRight;
|
||||||
int64_t pInterval = p->pTsma->interval, qInterval = q->pTsma->interval;
|
int64_t pInterval = p->pTsma->interval, qInterval = q->pTsma->interval;
|
||||||
int32_t code = getDuration(pInterval, p->pTsma->unit, &pInterval, TSDB_TIME_PRECISION_MILLI);
|
int8_t pUnit = p->pTsma->unit, qUnit = q->pTsma->unit;
|
||||||
ASSERT(code == TSDB_CODE_SUCCESS);
|
if (TIME_UNIT_MONTH == pUnit) {
|
||||||
code = getDuration(qInterval, q->pTsma->unit, &qInterval, TSDB_TIME_PRECISION_MILLI);
|
pInterval = pInterval * 31 * (NANOSECOND_PER_DAY / factors[p->precision]);
|
||||||
ASSERT(code == TSDB_CODE_SUCCESS);
|
} else if (TIME_UNIT_YEAR == pUnit){
|
||||||
|
pInterval = pInterval * 365 * (NANOSECOND_PER_DAY / factors[p->precision]);
|
||||||
|
}
|
||||||
|
if (TIME_UNIT_MONTH == qUnit) {
|
||||||
|
qInterval = qInterval * 31 * (NANOSECOND_PER_DAY / factors[q->precision]);
|
||||||
|
} else if (TIME_UNIT_YEAR == qUnit){
|
||||||
|
qInterval = qInterval * 365 * (NANOSECOND_PER_DAY / factors[q->precision]);
|
||||||
|
}
|
||||||
|
|
||||||
if (pInterval > qInterval) return -1;
|
if (pInterval > qInterval) return -1;
|
||||||
if (pInterval < qInterval) return 1;
|
if (pInterval < qInterval) return 1;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -6225,7 +6236,7 @@ static const STSMAOptUsefulTsma* tsmaOptFindUsefulTsma(const SArray* pUsefulTsma
|
||||||
int64_t tsmaInterval;
|
int64_t tsmaInterval;
|
||||||
for (int32_t i = startIdx; i < pUsefulTsmas->size; ++i) {
|
for (int32_t i = startIdx; i < pUsefulTsmas->size; ++i) {
|
||||||
const STSMAOptUsefulTsma* pUsefulTsma = taosArrayGet(pUsefulTsmas, i);
|
const STSMAOptUsefulTsma* pUsefulTsma = taosArrayGet(pUsefulTsmas, i);
|
||||||
getDuration(pUsefulTsma->pTsma->interval, pUsefulTsma->pTsma->unit, &tsmaInterval, precision);
|
tsmaInterval = pUsefulTsma->pTsma->interval;
|
||||||
if (alignInterval % tsmaInterval == 0 && alignInterval2 % tsmaInterval == 0) {
|
if (alignInterval % tsmaInterval == 0 && alignInterval2 % tsmaInterval == 0) {
|
||||||
return pUsefulTsma;
|
return pUsefulTsma;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1358,6 +1358,7 @@ class TDTestCase:
|
||||||
'create table nsdb.meters(ts timestamp, c1 int, c2 int, c3 varchar(255)) tags(t1 int, t2 int)', queryTimes=1)
|
'create table nsdb.meters(ts timestamp, c1 int, c2 int, c3 varchar(255)) tags(t1 int, t2 int)', queryTimes=1)
|
||||||
self.create_tsma('tsma1', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
|
self.create_tsma('tsma1', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
|
||||||
# Invalid tsma interval, 1ms ~ 1h is allowed
|
# Invalid tsma interval, 1ms ~ 1h is allowed
|
||||||
|
def _():
|
||||||
tdSql.error(
|
tdSql.error(
|
||||||
'create tsma tsma2 on nsdb.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097)
|
'create tsma tsma2 on nsdb.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097)
|
||||||
tdSql.error(
|
tdSql.error(
|
||||||
|
@ -1446,8 +1447,6 @@ class TDTestCase:
|
||||||
['avg(c1)', 'avg(c2)'], 'nsdb', 'meters', '10m', 'tsma1')
|
['avg(c1)', 'avg(c2)'], 'nsdb', 'meters', '10m', 'tsma1')
|
||||||
tdSql.execute('drop tsma nsdb.tsma1', queryTimes=1)
|
tdSql.execute('drop tsma nsdb.tsma1', queryTimes=1)
|
||||||
|
|
||||||
tdSql.error(
|
|
||||||
'create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097)
|
|
||||||
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u')
|
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u')
|
||||||
tdSql.execute('drop database nsdb')
|
tdSql.execute('drop database nsdb')
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue