diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 558cce0b6d..c220d13703 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1404,8 +1404,14 @@ int32_t tDeserializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp void tFreeSDnodeListRsp(SDnodeListRsp* pRsp); typedef struct { - SUseDbRsp* useDbRsp; - SDbCfgRsp* cfgRsp; + SArray* pTsmas; // SArray +} STableTSMAInfoRsp; + +typedef struct { + SUseDbRsp* useDbRsp; + SDbCfgRsp* cfgRsp; + STableTSMAInfoRsp* pTsmaRsp; + int32_t dbTsmaVersion; } SDbHbRsp; typedef struct { @@ -4234,10 +4240,6 @@ typedef struct { bool fillHistoryFinished; } STableTSMAInfo; -typedef struct { - SArray* pTsmas; // SArray -} STableTSMAInfoRsp; - int32_t tSerializeTableTSMAInfoRsp(void* buf, int32_t bufLen, const STableTSMAInfoRsp* pRsp); int32_t tDeserializeTableTSMAInfoRsp(void* buf, int32_t bufLen, STableTSMAInfoRsp* pRsp); int32_t tCloneTbTSMAInfo(STableTSMAInfo* pInfo, STableTSMAInfo** pRes); diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 6ea64c73fd..90cc4ac157 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -158,6 +158,7 @@ typedef struct SDbCacheInfo { int32_t cfgVersion; int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT int64_t stateTs; + int32_t tsmaVersion; } SDbCacheInfo; typedef struct SDynViewVersion { @@ -404,7 +405,7 @@ int32_t ctgdEnableDebug(char* option, bool enable); int32_t ctgdHandleDbgCommand(char* command); -int32_t catalogAsyncUpdateTSMA(SCatalog* pCtg, STableTSMAInfo** pTsma); +int32_t catalogAsyncUpdateTSMA(SCatalog* pCtg, STableTSMAInfo** pTsma, int32_t tsmaVersion); int32_t catalogUpdateTSMA(SCatalog* pCtg, STableTSMAInfo** ppTsma); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 14257f8a50..78e8b04a13 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -234,6 +234,15 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog catalogUpdateDbCfg(pCatalog, rsp->cfgRsp->db, rsp->cfgRsp->dbId, rsp->cfgRsp); rsp->cfgRsp = NULL; } + if (rsp->pTsmaRsp) { + if (rsp->pTsmaRsp->pTsmas) { + for (int32_t i = 0; i < rsp->pTsmaRsp->pTsmas->size; ++i) { + STableTSMAInfo* pTsma = taosArrayGetP(rsp->pTsmaRsp->pTsmas, i); + catalogAsyncUpdateTSMA(pCatalog, &pTsma, rsp->dbTsmaVersion); + } + taosArrayClear(rsp->pTsmaRsp->pTsmas); + } + } } _return: @@ -797,6 +806,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl db->cfgVersion = htonl(db->cfgVersion); db->numOfTable = htonl(db->numOfTable); db->stateTs = htobe64(db->stateTs); + db->tsmaVersion = htonl(db->tsmaVersion); } SKv kv = { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index d360c86ef5..d0a844b185 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -67,6 +67,8 @@ static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq); static int32_t tDecodeSBatchDeleteReqCommon(SDecoder *pDecoder, SBatchDeleteReq *pReq); +static int32_t tEncodeTableTSMAInfoRsp(SEncoder *pEncoder, const STableTSMAInfoRsp *pRsp); +static int32_t tDecodeTableTSMAInfoRsp(SDecoder* pDecoder, STableTSMAInfoRsp* pRsp); int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { if (pMsg == NULL) { @@ -3666,6 +3668,14 @@ int32_t tSerializeSDbHbRspImp(SEncoder *pEncoder, const SDbHbRsp *pRsp) { if (tEncodeI8(pEncoder, 0) < 0) return -1; } + if (pRsp->pTsmaRsp) { + if (tEncodeI8(pEncoder, 1) < 0) return -1; + if (tEncodeTableTSMAInfoRsp(pEncoder, pRsp->pTsmaRsp) < 0) return -1; + } else { + if (tEncodeI8(pEncoder, 0) < 0) return -1; + } + if (tEncodeI32(pEncoder, pRsp->dbTsmaVersion) < 0) return -1; + return 0; } @@ -3746,6 +3756,17 @@ int32_t tDeserializeSDbHbRspImp(SDecoder *decoder, SDbHbRsp *pRsp) { if (NULL == pRsp->cfgRsp) return -1; if (tDeserializeSDbCfgRspImpl(decoder, pRsp->cfgRsp) < 0) return -1; } + if (!tDecodeIsEnd(decoder)) { + if (tDecodeI8(decoder, &flag) < 0) return -1; + if (flag) { + pRsp->pTsmaRsp = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp)); + if (!pRsp->pTsmaRsp) return -1; + if (tDecodeTableTSMAInfoRsp(decoder, pRsp->pTsmaRsp) < 0) return -1; + } + } + if (!tDecodeIsEnd(decoder)) { + if (tDecodeI32(decoder, &pRsp->dbTsmaVersion) < 0) return -1; + } return 0; } @@ -3795,6 +3816,10 @@ void tFreeSDbHbRsp(SDbHbRsp *pDbRsp) { tFreeSDbCfgRsp(pDbRsp->cfgRsp); taosMemoryFree(pDbRsp->cfgRsp); } + if (pDbRsp->pTsmaRsp) { + tFreeTableTSMAInfoRsp(pDbRsp->pTsmaRsp); + taosMemoryFree(pDbRsp->pTsmaRsp); + } } void tFreeSDbHbBatchRsp(SDbHbBatchRsp *pRsp) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index fec4d958ca..f41e56a9a0 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -398,6 +398,7 @@ typedef struct { SRWLatch lock; int64_t stateTs; int64_t compactStartTime; + int32_t tsmaVersion; } SDbObj; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndSma.h b/source/dnode/mnode/impl/inc/mndSma.h index ad727d67ea..6e466aeea6 100644 --- a/source/dnode/mnode/impl/inc/mndSma.h +++ b/source/dnode/mnode/impl/inc/mndSma.h @@ -31,6 +31,7 @@ int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist); int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t numOfTsmas, void **ppRsp, int32_t *pRspLen); +int32_t mndGetDbTsmas(SMnode *pMnode, const char *dbFName, uint64_t dbUid, STableTSMAInfoRsp *pRsp, bool *exist); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index e34a5ee281..26649df530 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -36,7 +36,7 @@ #include "tjson.h" #define DB_VER_NUMBER 1 -#define DB_RESERVE_SIZE 32 +#define DB_RESERVE_SIZE 28 static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw); static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb); @@ -146,6 +146,7 @@ SSdbRaw *mndDbActionEncode(SDbObj *pDb) { SDB_SET_INT32(pRaw, dataPos, pDb->cfg.s3KeepLocal, _OVER) SDB_SET_INT8(pRaw, dataPos, pDb->cfg.s3Compact, _OVER) SDB_SET_INT8(pRaw, dataPos, pDb->cfg.withArbitrator, _OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->tsmaVersion, _OVER); SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) @@ -241,6 +242,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.s3KeepLocal, _OVER) SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.s3Compact, _OVER) SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.withArbitrator, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->tsmaVersion, _OVER); SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER) taosInitRWLatch(&pDb->lock); @@ -347,6 +349,7 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) { pOld->cfg.s3Compact = pNew->cfg.s3Compact; pOld->cfg.withArbitrator = pNew->cfg.withArbitrator; pOld->compactStartTime = pNew->compactStartTime; + pOld->tsmaVersion = pNew->tsmaVersion; taosWUnLockLatch(&pOld->lock); return 0; } @@ -681,6 +684,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, dbObj.uid = mndGenerateUid(dbObj.name, TSDB_DB_FNAME_LEN); dbObj.cfgVersion = 1; dbObj.vgVersion = 1; + dbObj.tsmaVersion = 1; memcpy(dbObj.createUser, pUser->user, TSDB_USER_LEN); dbObj.cfg = (SDbCfg){ .numOfVgroups = pCreate->numOfVgroups, @@ -1682,6 +1686,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs, pDbCacheInfo->cfgVersion = htonl(pDbCacheInfo->cfgVersion); pDbCacheInfo->numOfTable = htonl(pDbCacheInfo->numOfTable); pDbCacheInfo->stateTs = be64toh(pDbCacheInfo->stateTs); + pDbCacheInfo->tsmaVersion = htonl(pDbCacheInfo->tsmaVersion); SDbHbRsp rsp = {0}; @@ -1720,7 +1725,8 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs, int32_t numOfTable = mndGetDBTableNum(pDb, pMnode); if (pDbCacheInfo->vgVersion >= pDb->vgVersion && pDbCacheInfo->cfgVersion >= pDb->cfgVersion && - numOfTable == pDbCacheInfo->numOfTable && pDbCacheInfo->stateTs == pDb->stateTs) { + numOfTable == pDbCacheInfo->numOfTable && pDbCacheInfo->stateTs == pDb->stateTs && + pDbCacheInfo->tsmaVersion >= pDb->tsmaVersion) { mTrace("db:%s, valid dbinfo, vgVersion:%d cfgVersion:%d stateTs:%" PRId64 " numOfTables:%d, not changed vgVersion:%d cfgVersion:%d stateTs:%" PRId64 " numOfTables:%d", pDbCacheInfo->dbFName, pDbCacheInfo->vgVersion, pDbCacheInfo->cfgVersion, pDbCacheInfo->stateTs, @@ -1739,6 +1745,16 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs, mndDumpDbCfgInfo(rsp.cfgRsp, pDb); } + if (pDbCacheInfo->tsmaVersion != pDb->tsmaVersion) { + rsp.pTsmaRsp = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp)); + if (rsp.pTsmaRsp) rsp.pTsmaRsp->pTsmas = taosArrayInit(4, POINTER_BYTES); + if (rsp.pTsmaRsp && rsp.pTsmaRsp->pTsmas) { + rsp.dbTsmaVersion = pDb->tsmaVersion; + bool exist = false; + mndGetDbTsmas(pMnode, 0, pDb->uid, rsp.pTsmaRsp, &exist); + } + } + if (pDbCacheInfo->vgVersion < pDb->vgVersion || numOfTable != pDbCacheInfo->numOfTable || pDbCacheInfo->stateTs != pDb->stateTs) { rsp.useDbRsp = taosMemoryCalloc(1, sizeof(SUseDbRsp)); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 88dff29d30..0753114e13 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -67,7 +67,7 @@ typedef struct SCreateTSMACxt { const SMCreateSmaReq *pCreateSmaReq; const SMDropSmaReq * pDropSmaReq; }; - const SDbObj * pDb; + SDbObj *pDb; SStbObj * pSrcStb; SSmaObj * pSma; const SSmaObj * pBaseSma; @@ -1463,7 +1463,7 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) { pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pCreateStreamReq->numOfTags, sizeof(SField)); SField f = {0}; if (pCxt->pSrcStb) { - for (int32_t idx = 0; idx < pCxt->pSrcStb->numOfTags; ++idx) { + for (int32_t idx = 0; idx < pCxt->pCreateStreamReq->numOfTags - 1; ++idx) { SSchema *pSchema = &pCxt->pSrcStb->pTags[idx]; f.bytes = pSchema->bytes; f.type = pSchema->type; @@ -1486,12 +1486,28 @@ static void mndCreateTSMABuildDropStreamReq(SCreateTSMACxt* pCxt) { pCxt->pDropStreamReq->sqlLen = strlen(pCxt->pDropStreamReq->sql); } -static int32_t mndCreateTSMASetCreateStreamRedoAction(SMnode* pMnode) { - return TSDB_CODE_SUCCESS; +static int32_t mndSetUpdateDbTsmaVersionPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) { + SSdbRaw *pRedoRaw = mndDbActionEncode(pOld); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) { + sdbFreeRaw(pRedoRaw); + return -1; + } + + (void)sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); + return 0; } -static int32_t mndCreateTSMASetCreateStreamUndoAction(SMnode* pMnode) { - return TSDB_CODE_SUCCESS; +static int32_t mndSetUpdateDbTsmaVersionCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) { + SSdbRaw *pCommitRaw = mndDbActionEncode(pNew); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + sdbFreeRaw(pCommitRaw); + return -1; + } + + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + return 0; } static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) { @@ -1541,6 +1557,11 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) { goto _OVER; } + SDbObj newDb = {0}; + memcpy(&newDb, pCxt->pDb, sizeof(SDbObj)); + newDb.tsmaVersion++; + if (mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb) != 0) goto _OVER; + if (mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb) != 0) goto _OVER; if (mndSetCreateSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER; if (mndSetCreateSmaUndoLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER; @@ -1630,7 +1651,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { if (createReq.normSourceTbUid == 0) { pStb = mndAcquireStb(pMnode, createReq.stb); - if (!pStb) { + if (!pStb && !createReq.recursiveTsma) { mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb); terrno = TSDB_CODE_MND_STB_NOT_EXIST; goto _OVER; @@ -1684,6 +1705,9 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { terrno = TSDB_CODE_MND_SMA_NOT_EXIST; goto _OVER; } + if (!pStb) { + createReq.normSourceTbUid = pBaseTsma->stbUid; + } } SCreateTSMACxt cxt = { @@ -1770,6 +1794,11 @@ static int32_t mndDropTSMA(SCreateTSMACxt* pCxt) { goto _OVER; } + SDbObj newDb = {0}; + memcpy(&newDb, pCxt->pDb, sizeof(SDbObj)); + newDb.tsmaVersion++; + if (mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb) != 0) goto _OVER; + if (mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb) != 0) goto _OVER; if (mndSetDropSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER; if (mndSetDropSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER; if (mndTransAppendRedoAction(pTrans, &dropStreamRedoAction) != 0) goto _OVER; @@ -1880,6 +1909,7 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo if ((pDb && pSma->dbUid != pDb->uid) || !pSrcDb) { sdbRelease(pMnode->pSdb, pSma); + if (pSrcDb) mndReleaseDb(pMnode, pSrcDb); continue; } @@ -2115,7 +2145,9 @@ static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rs return 0; } -static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *rsp, bool *exist) { +typedef bool (*tsmaFilter)(const SSmaObj* pSma, void* param); + +static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilter filtered, void* param, bool* exist) { int32_t code = -1; SSmaObj * pSma = NULL; SSmaObj * pBaseTsma = NULL; @@ -2123,20 +2155,12 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp void * pIter = NULL; SStreamObj * pStreamObj = NULL; SStbObj * pStb = NULL; - /* - SStbObj *pStb = mndAcquireStb(pMnode, tbFName); - if (NULL == pStb) { - *exist = false; - return TSDB_CODE_SUCCESS; - } - mndReleaseStb(pMnode, pStb); - */ while (1) { pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); if (pIter == NULL) break; - if (pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName)) { + if (filtered(pSma, param)) { sdbRelease(pSdb, pSma); continue; } @@ -2182,7 +2206,7 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp sdbCancelFetch(pSdb, pIter); return code; } - if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) { + if (NULL == taosArrayPush(pRsp->pTsmas, &pTsma)) { terrno = TSDB_CODE_OUT_OF_MEMORY; tFreeTableTSMAInfo(pTsma); sdbCancelFetch(pSdb, pIter); @@ -2193,6 +2217,24 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp return TSDB_CODE_SUCCESS; } +static bool tsmaTbFilter(const SSmaObj* pSma, void* param) { + const char* tbFName = param; + return pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName) != 0; +} + +static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *pRsp, bool *exist) { + return mndGetSomeTsmas(pMnode, pRsp, tsmaTbFilter, tbFName, exist); +} + +static bool tsmaDbFilter(const SSmaObj* pSma, void* param) { + uint64_t *dbUid = param; + return pSma->dbUid != *dbUid; +} + +int32_t mndGetDbTsmas(SMnode *pMnode, const char *dbFName, uint64_t dbUid, STableTSMAInfoRsp *pRsp, bool *exist) { + return mndGetSomeTsmas(pMnode, pRsp, tsmaDbFilter, &dbUid, exist); +} + static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) { STableTSMAInfoRsp rsp = {0}; int32_t code = -1; @@ -2312,7 +2354,10 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t SStbObj* pDestStb = mndAcquireStb(pMnode, pSma->dstTbName); if (!pDestStb) { mInfo("tsma: %s.%" PRIx64 " dest stb: %s not found, maybe dropped", tsmaFName, pTsmaVer->tsmaId, pSma->dstTbName); + terrno = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo); mndReleaseSma(pMnode, pSma); + if (terrno) goto _OVER; + taosArrayPush(hbRsp.pTsmas, &pTsmaInfo); continue; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index b13d69edc7..033c55a581 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -3854,6 +3854,7 @@ static int32_t mndSetDropTbsRedoActions(SMnode* pMnode, STrans* pTrans, const SV static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg* pRsp, SMndDropTbsWithTsmaCtx* pCtx) { SMnode *pMnode = pRsp->info.node; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pRsp, "drop-tbs"); + mndTransSetChangeless(pTrans); if (pTrans == NULL) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 3ef16164f1..c5c14950b2 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -335,6 +335,7 @@ typedef struct SCtgDBCache { SHashObj* tbCache; // key:tbname, value:SCtgTbCache SHashObj* stbCache; // key:suid, value:char* SHashObj* tsmaCache; // key:tbname, value: SCtgTSMACache + int32_t tsmaVersion; uint64_t dbCacheNum[CTG_CI_MAX_VALUE]; uint64_t dbCacheSize; } SCtgDBCache; @@ -599,6 +600,8 @@ typedef struct SCtgDropViewMetaMsg { typedef struct SCtgUpdateTbTSMAMsg { SCatalog* pCtg; STableTSMAInfo* pTsma; + int32_t dbTsmaVersion; + uint64_t dbId; } SCtgUpdateTbTSMAMsg; typedef struct SCtgDropTbTSMAMsg { @@ -1148,7 +1151,7 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaName); int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* name, STableTSMAInfoRsp* out, SCtgTaskReq* tReq, int32_t reqType); -int32_t ctgUpdateTbTSMAEnqueue(SCatalog* pCtg, STSMACache** pTsma, bool syncOp); +int32_t ctgUpdateTbTSMAEnqueue(SCatalog* pCtg, STSMACache** pTsma, int32_t tsmaVersion, bool syncOp); int32_t ctgDropTSMAForTbEnqueue(SCatalog* pCtg, SName* pName, bool syncOp); int32_t ctgDropTbTSMAEnqueue(SCatalog* pCtg, const STSMACache* pTsma, bool syncOp); int32_t ctgOpDropTbTSMA(SCtgCacheOperation* operation); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index f66d133453..e7d5a89d6f 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1790,13 +1790,13 @@ int32_t catalogGetViewMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* CTG_API_LEAVE(TSDB_CODE_OPS_NOT_SUPPORT); } -int32_t catalogAsyncUpdateTSMA(SCatalog* pCtg, STableTSMAInfo** ppTsma) { +int32_t catalogAsyncUpdateTSMA(SCatalog* pCtg, STableTSMAInfo** ppTsma, int32_t tsmaVersion) { CTG_API_ENTER(); if (!pCtg || !ppTsma) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } int32_t code = 0; - CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, ppTsma, false)); + CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, ppTsma, tsmaVersion, false)); _return: CTG_API_LEAVE(code); @@ -1808,7 +1808,7 @@ int32_t catalogUpdateTSMA(SCatalog* pCtg, STableTSMAInfo** pTsma) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } int32_t code = 0; - CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, pTsma, true)); + CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, pTsma, 0, true)); _return: CTG_API_LEAVE(code); @@ -1844,7 +1844,7 @@ int32_t ctgGetTbTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName tsmasRsp.pTsmas = NULL; for (int32_t i = 0; i < (*ppRes)->size; ++i) { - CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, taosArrayGet((*ppRes), i), false)); + CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, taosArrayGet((*ppRes), i), 0, false)); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 68a1d1a471..9f20755313 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -2896,7 +2896,7 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf for (int32_t i = 0; i < taosArrayGetSize(pTsmas); ++i) { STableTSMAInfo* pInfo = taosArrayGetP(pTsmas, i); CTG_ERR_JRET(tCloneTbTSMAInfo(pInfo, &pTsma)); - CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, &pTsma, false)); + CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, &pTsma, 0, false)); } if (atomic_sub_fetch_32(&pCtx->fetchNum, 1) == 0) { diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index b29409a93b..0554257810 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -1415,7 +1415,7 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { CTG_CACHE_NUM_INC(CTG_CI_DB, 1); - SDbCacheInfo dbCacheInfo = {.dbId = newDBCache.dbId, .vgVersion = -1, .stateTs = 0, .cfgVersion = -1}; + SDbCacheInfo dbCacheInfo = {.dbId = newDBCache.dbId, .vgVersion = -1, .stateTs = 0, .cfgVersion = -1, .tsmaVersion = -1}; tstrncpy(dbCacheInfo.dbFName, dbFName, sizeof(dbCacheInfo.dbFName)); ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId); @@ -1802,7 +1802,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { bool newAdded = false; SDbCacheInfo dbCacheInfo = { - .dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .cfgVersion = -1, .numOfTable = dbInfo->numOfTable, .stateTs = dbInfo->stateTs}; + .dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .cfgVersion = -1, .numOfTable = dbInfo->numOfTable, .stateTs = dbInfo->stateTs, .tsmaVersion = -1}; SCtgDBCache *dbCache = NULL; CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache)); @@ -1846,6 +1846,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { if (dbCache->cfgCache.cfgInfo) { dbCacheInfo.cfgVersion = dbCache->cfgCache.cfgInfo->cfgVersion; + dbCacheInfo.tsmaVersion = dbCache->tsmaVersion; } vgCache->vgInfo = dbInfo; @@ -1913,6 +1914,7 @@ int32_t ctgOpUpdateDbCfg(SCtgCacheOperation *operation) { } else { cacheInfo.vgVersion = -1; } + cacheInfo.tsmaVersion = dbCache->tsmaVersion; ctgWLockDbCfgInfo(dbCache); @@ -3230,16 +3232,13 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx // get tsma cache pCache = taosHashAcquire(dbCache->tsmaCache, tsmaSourceTbName.tname, strlen(tsmaSourceTbName.tname)); - if (!pCache) { - ctgDebug("tsma for tb: %s.%s not in cache", dbFName, tsmaSourceTbName.tname); - ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TB_TSMA, &tsmaSourceTbName); + if (!pCache || !pCache->pTsmas || pCache->pTsmas->size == 0) { taosArrayPush(pCtx->pResList, &(SMetaRes){0}); - CTG_CACHE_NHIT_INC(CTG_CI_TBL_SMA, 1); continue; } CTG_LOCK(CTG_READ, &pCache->tsmaLock); - if (!pCache->pTsmas || pCache->pTsmas->size == 0 || hasOutOfDateTSMACache(pCache->pTsmas)) { + if (hasOutOfDateTSMACache(pCache->pTsmas)) { CTG_UNLOCK(CTG_READ, &pCache->tsmaLock); taosHashRelease(dbCache->tsmaCache, pCache); ctgDebug("tsma for tb: %s.%s not in cache", tsmaSourceTbName.tname, dbFName); @@ -3340,7 +3339,7 @@ int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaNam CTG_RET(code); } -int32_t ctgUpdateTbTSMAEnqueue(SCatalog *pCtg, STSMACache **pTsma, bool syncOp) { +int32_t ctgUpdateTbTSMAEnqueue(SCatalog *pCtg, STSMACache **pTsma, int32_t tsmaVersion, bool syncOp) { int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_UPDATE_TB_TSMA; @@ -3355,6 +3354,8 @@ int32_t ctgUpdateTbTSMAEnqueue(SCatalog *pCtg, STSMACache **pTsma, bool syncOp) msg->pCtg = pCtg; msg->pTsma = *pTsma; + msg->dbTsmaVersion = tsmaVersion; + msg->dbId = (*pTsma)->dbId; op->data = msg; @@ -3601,8 +3602,24 @@ int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation *operation) { } CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pTsmaInfo->dbFName, pTsmaInfo->dbId, &dbCache)); - CTG_ERR_JRET(ctgWriteTbTSMAToCache(pCtg, dbCache, pTsmaInfo->dbFName, pTsmaInfo->tb, &pTsmaInfo)); + if (dbCache && msg->dbTsmaVersion > 0) { + dbCache->tsmaVersion = msg->dbTsmaVersion; + SDbCacheInfo cacheInfo = {0}; + cacheInfo.dbId = dbCache->dbId; + if (dbCache->cfgCache.cfgInfo) { + cacheInfo.cfgVersion = dbCache->cfgCache.cfgInfo->cfgVersion; + tstrncpy(cacheInfo.dbFName, dbCache->cfgCache.cfgInfo->db, TSDB_DB_FNAME_LEN); + } + if (dbCache->vgCache.vgInfo) { + cacheInfo.vgVersion = dbCache->vgCache.vgInfo->vgVersion; + cacheInfo.numOfTable = dbCache->vgCache.vgInfo->numOfTable; + cacheInfo.stateTs = dbCache->vgCache.vgInfo->stateTs; + } + cacheInfo.tsmaVersion = dbCache->tsmaVersion; + CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &cacheInfo, cacheInfo.dbId, sizeof(SDbCacheInfo), + ctgDbCacheInfoSortCompare, ctgDbCacheInfoSearchCompare)); + } _return: diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 5cf5e09a78..9811cfbca8 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10523,13 +10523,13 @@ static int32_t translateShowCreateView(STranslateContext* pCxt, SShowCreateViewS #endif } -SNode* createColumnNodeWithName(const char* name) { +static SColumnNode* createColumnNodeWithName(const char* name) { SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); if (!pCol) return NULL; tstrncpy(pCol->colName, name, TSDB_COL_NAME_LEN); tstrncpy(pCol->node.aliasName, name, TSDB_COL_NAME_LEN); tstrncpy(pCol->node.userAlias, name, TSDB_COL_NAME_LEN); - return (SNode*)pCol; + return pCol; } static bool sortFuncWithFuncId(SNode* pNode1, SNode* pNode2) { @@ -10609,10 +10609,11 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC } } if (TSDB_CODE_SUCCESS == code) { + int32_t partitionTagNum = pStmt->pOptions->recursiveTsma ? numOfTags - 1 : numOfTags; // append partition by tags SNode* pTagCol = NULL; - for (int32_t idx = 0; idx < numOfTags; ++idx) { - pTagCol = createColumnNodeWithName(pTags[idx].name); + for (int32_t idx = 0; idx < partitionTagNum; ++idx) { + pTagCol = (SNode*)createColumnNodeWithName(pTags[idx].name); if (!pTagCol) { code = TSDB_CODE_OUT_OF_MEMORY; break; @@ -10624,11 +10625,17 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC // sub table if (code == TSDB_CODE_SUCCESS) { SFunctionNode* pSubTable = NULL; - code = buildTSMAAstStreamSubTable(pStmt, pReq, pStmt->pOptions->recursiveTsma ? pTagCol : (SNode*)pTbnameFunc, (SNode**)&pSubTable); + pTagCol = NULL; + if (pTags && numOfTags > 0) { + pTagCol = (SNode*)createColumnNodeWithName(pTags[numOfTags - 1].name); + if (!pTagCol) code = TSDB_CODE_OUT_OF_MEMORY; + } if (code == TSDB_CODE_SUCCESS) { + code = buildTSMAAstStreamSubTable(pStmt, pReq, pStmt->pOptions->recursiveTsma ? pTagCol : (SNode*)pTbnameFunc, (SNode**)&pSubTable); info.pSubTable = (SNode*)pSubTable; } - code = nodesListMakeStrictAppend(&info.pTags, nodesCloneNode((SNode*)pTbnameFunc)); + if (code == TSDB_CODE_SUCCESS) + code = nodesListMakeStrictAppend(&info.pTags, pStmt->pOptions->recursiveTsma ? pTagCol : nodesCloneNode((SNode*)pTbnameFunc)); } } @@ -10769,9 +10776,9 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm memcpy(pStmt->originalTbName, pRecursiveTsma->tb, TSDB_TABLE_NAME_LEN); tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pRecursiveTsma->tb, useTbName), pReq->stb); numOfCols = pRecursiveTsma->pUsedCols->size; - numOfTags = pRecursiveTsma->pTags->size; + numOfTags = pRecursiveTsma->pTags ? pRecursiveTsma->pTags->size: 0; pCols = pRecursiveTsma->pUsedCols->pData; - pTags = pRecursiveTsma->pTags->pData; + pTags = pRecursiveTsma->pTags ? pRecursiveTsma->pTags->pData : NULL; code = getTableMeta(pCxt, pStmt->dbName, pRecursiveTsma->targetTb, &pTableMeta); } } else { @@ -12424,10 +12431,13 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS SVAlterTbReq* pReq) { SName tbName = {0}; SArray* pTsmas = NULL; - toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tbName); - int32_t code = getTableTsmasFromCache(pCxt->pMetaCache, &tbName, &pTsmas); - if (code != TSDB_CODE_SUCCESS) return code; - if (pTsmas && pTsmas->size > 0) return TSDB_CODE_TSMA_MUST_BE_DROPPED; + int32_t code = TSDB_CODE_SUCCESS; + if (pCxt->pMetaCache) { + toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tbName); + code = getTableTsmasFromCache(pCxt->pMetaCache, &tbName, &pTsmas); + if (code != TSDB_CODE_SUCCESS) return code; + if (pTsmas && pTsmas->size > 0) return TSDB_CODE_TSMA_MUST_BE_DROPPED; + } SSchema* pSchema = getTagSchema(pTableMeta, pStmt->colName); if (NULL == pSchema) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 86a2e9f1b7..ae37334762 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -5969,6 +5969,10 @@ static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) { pTsmaScanCols = taosArrayInit(pTsmaOptCtx->pAggFuncs->length, sizeof(int32_t)); if (!pTsmaScanCols) return TSDB_CODE_OUT_OF_MEMORY; } + if (pTsmaOptCtx->pScan->tableType == TSDB_CHILD_TABLE || pTsmaOptCtx->pScan->tableType == TSDB_NORMAL_TABLE) { + const STsmaTargetTbInfo* ptbInfo = taosArrayGet(pTsmaOptCtx->pScan->pTsmaTargetTbInfo, i); + if (ptbInfo->uid == 0) continue; // tsma res table meta not found, skip this tsma, this is possible when there is no data in this ctb + } STableTSMAInfo* pTsma = taosArrayGetP(pTsmaOptCtx->pTsmas, i); if (!pTsma->fillHistoryFinished || tsMaxTsmaCalcDelay * 1000 < (pTsma->rspTs - pTsma->reqTs) + pTsma->delayDuration) { diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index 71e1db22a8..565f5560ff 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -498,6 +498,18 @@ class TSMATestSQLGenerator: ret = ret + f' LIMIT {random.randint(0, self.opts_.limit_max)}' return ret + ## if offset is True, offset cannot be the same as interval + def generate_random_offset_sliding(self, interval: str, offset: bool = False) -> str: + unit = interval[-1] + hasUnit = unit.isalpha() + if not hasUnit: + start = 1 + if offset: + start = 2 + ret: int = int(int(interval) / random.randint(start, 5)) + return str(ret) + return '' + # add sliding offset def generate_interval(self, intervals: List[str]) -> str: if not self.opts_.interval: @@ -506,7 +518,17 @@ class TSMATestSQLGenerator: return '' value = random.choice(intervals) self.res_.has_interval = True - return f'INTERVAL({value})' + has_offset = False + offset = '' + has_sliding = False + sliding = '' + num: int = int(value[:-1]) + unit = value[-1] + if has_offset and num > 1: + offset = f', {self.generate_random_offset_sliding(value, True)}' + if has_sliding: + sliding = f'sliding({self.generate_random_offset_sliding(value)})' + return f'INTERVAL({value} {offset}) {sliding}' def generate_tag_list(self): used_tag_num = random.randrange(1, self.opts_.tag_num + 1) @@ -714,8 +736,12 @@ class TDTestCase: if ctx.has_tsma(): if ctx.used_tsmas[0].name == tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX: break - elif len(ctx.used_tsmas[0].name) == 32 and 1: ## select md5 - break + elif len(ctx.used_tsmas[0].name) == 32: + name = f'1.{db}.{tsma_name}_{tb}' + if ctx.used_tsmas[0].name == TSMAQCBuilder().md5(name): + break + else: + time.sleep(1) else: time.sleep(1) else: @@ -757,16 +783,14 @@ class TDTestCase: self.tsma_tester.check_sql(ctx.sql, ctx) def test_query_with_tsma(self): - self.create_tsma('tsma1', 'test', 'meters', [ - 'avg(c1)', 'avg(c2)'], '5m') - self.create_tsma('tsma2', 'test', 'meters', [ - 'avg(c1)', 'avg(c2)'], '30m') - self.create_tsma('tsma5', 'test', 'norm_tb', [ - 'avg(c1)', 'avg(c2)'], '10m') + self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m') + self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m') + self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m') self.test_query_with_tsma_interval() self.test_query_with_tsma_agg() self.test_recursive_tsma() + self.test_query_interval_sliding() self.test_union() self.test_query_child_table() self.test_skip_tsma_hint() @@ -774,6 +798,43 @@ class TDTestCase: self.test_long_ctb_name() self.test_add_tag_col() self.test_modify_col_name_value() + self.test_alter_tag_val() + self.test_ins_tsma() + + def test_ins_tsma(self): + tdSql.execute('use performance_schema') + tdSql.query('show tsmas') + tdSql.checkRows(0) + tdSql.execute('use test') + tdSql.query('show tsmas') + tdSql.checkRows(3) + tdSql.query('select * from information_schema.ins_tsmas') + tdSql.checkRows(3) + tdSql.execute('create database dd') + tdSql.execute('use dd') + tdSql.execute('create table norm_tb (ts timestamp, c1 int)') + tdSql.execute('insert into norm_tb values(now, 1)') + self.create_tsma('tsma_norm_tb_dd', 'dd', 'norm_tb', ['avg(c1)', 'sum(c1)', 'min(c1)'], '10m') + tdSql.query('show tsmas') + tdSql.checkRows(1) + tdSql.query('select * from information_schema.ins_tsmas') + tdSql.checkRows(4) + tdSql.query('show test.tsmas') + tdSql.checkRows(3) + tdSql.execute('use test') + tdSql.query('show dd.tsmas') + tdSql.checkRows(1) + tdSql.execute('drop database dd') + tdSql.query('select * from information_schema.ins_tsmas') + tdSql.checkRows(3) + tdSql.execute('use test') + + def test_alter_tag_val(self): + sql = 'alter table t1 set tag t1 = 999' + tdSql.error(sql, -2147471088) + + def test_query_interval_sliding(self): + pass def test_union(self): ctxs = [] @@ -914,14 +975,21 @@ class TDTestCase: 'tsma2', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc() self.check([ctx]) + # test recrusive tsma on norm_tb + tsma_name = 'tsma_recursive_on_norm_tb' + self.create_recursive_tsma('tsma5', tsma_name, 'test', '20m', 'norm_tb', ['avg(c1)', 'avg(c2)']) + sql = 'select avg(c1), avg(c2), tbname from norm_tb partition by tbname interval(20m)' + self.check([TSMAQCBuilder().with_sql(sql).should_query_with_tsma_ctb('test', tsma_name, 'norm_tb').get_qc()]) + tdSql.execute(f'drop tsma {tsma_name}') + def test_query_with_tsma_interval(self): self.check(self.test_query_with_tsma_interval_possibly_partition()) self.check(self.test_query_with_tsma_interval_partition_by_col()) def test_query_tsma_all(self, func_list: List = ['avg(c1)', 'avg(c2)']) -> List: ctxs = [] - interval_list = ['1s', '5s', '60s', '1m', '10m', '20m', - '30m', '59s', '1h', '120s', '1200', '2h', '90m', '1d'] + interval_list = ['1s', '5s', '59s', '60s', '1m', '120s', '10m', '20m', + '30m', '1h', '90m', '2h', '8h', '1d'] opts: TSMATesterSQLGeneratorOptions = TSMATesterSQLGeneratorOptions() opts.interval = True opts.where_ts_range = True @@ -962,6 +1030,12 @@ class TDTestCase: sql = 'select avg(c1), avg(c2) from meters interval(60m)' ctxs.append(TSMAQCBuilder().with_sql(sql) .should_query_with_tsma('tsma2', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc()) + sql = 'select avg(c1), avg(c2) from meters interval(60m, 30m) SLIDING(30m)' + ctxs.append(TSMAQCBuilder().with_sql(sql) + .should_query_with_tsma('tsma2', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc()) + sql = 'select avg(c1), avg(c2) from meters interval(60m, 25m) SLIDING(25m)' + ctxs.append(TSMAQCBuilder().with_sql(sql) + .should_query_with_tsma('tsma1', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc()) sql = "select avg(c1), avg(c2) from meters where ts >= '2018-09-17 09:00:00.009' and ts < '2018-09-17 10:23:19.665' interval(30m)" ctxs.append(TSMAQCBuilder().with_sql(sql) @@ -969,6 +1043,12 @@ class TDTestCase: .should_query_with_tsma('tsma2', '2018-09-17 09:30:00', '2018-09-17 09:59:59.999') .should_query_with_table('meters', '2018-09-17 10:00:00.000', '2018-09-17 10:23:19.664').get_qc()) + sql = "select avg(c1), avg(c2) from meters where ts >= '2018-09-17 09:00:00.009' and ts < '2018-09-17 10:23:19.665' interval(30m, 25m) SLIDING(10m)" + ctxs.append(TSMAQCBuilder().with_sql(sql) + .should_query_with_table('meters', '2018-09-17 09:00:00.009', '2018-09-17 09:04:59.999') + .should_query_with_tsma('tsma1', '2018-09-17 09:05:00', '2018-09-17 09:54:59.999') + .should_query_with_table('meters', '2018-09-17 09:55:00.000', '2018-09-17 10:23:19.664').get_qc()) + sql = "SELECT avg(c1), avg(c2),_wstart, _wend,t3,t4,t5,t2 FROM meters WHERE ts >= '2018-09-17 8:00:00' AND ts < '2018-09-17 09:03:18.334' PARTITION BY t3,t4,t5,t2 INTERVAL(1d);" ctxs.append(TSMAQCBuilder().with_sql(sql) .should_query_with_table('meters', '2018-09-17 8:00:00', '2018-09-17 09:03:18.333').get_qc()) @@ -1116,9 +1196,6 @@ class TDTestCase: self.test_ddl() self.test_query_with_tsma() - def test_ins_tsma(self): - pass - def test_create_tsma(self): function_name = sys._getframe().f_code.co_name tdLog.debug(f'-----{function_name}------')