diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 2b83e62dc2..f7a66bc50b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -447,9 +447,9 @@ typedef enum ENodeType { } ENodeType; typedef struct { - int32_t vgId; - char* dbFName; - char* tbName; + int32_t vgId; + const char* dbFName; + const char* tbName; } SBuildTableInput; typedef struct { @@ -3570,6 +3570,8 @@ typedef struct { int64_t lastTs; int64_t normSourceTbUid; // the Uid of source tb if its a normal table, otherwise 0 SArray* pVgroupVerList; + int8_t recursiveTsma; + char baseTsmaName[TSDB_TABLE_FNAME_LEN]; // base tsma name for recursively created tsma } SMCreateSmaReq; int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 8330974344..21a83d620f 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -423,6 +423,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) #define TSDB_CODE_MND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0481) #define TSDB_CODE_MND_INVALID_SMA_OPTION TAOS_DEF_ERROR_CODE(0, 0x0482) +#define TSDB_CODE_MND_INVALID_DROP_TSMA TAOS_DEF_ERROR_CODE(0, 0x0485) // mnode-tag-indxe diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ff54a15917..0992e74c66 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -895,6 +895,8 @@ int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq if (tEncodeI32(&encoder, p->vgId) < 0) return -1; if (tEncodeI64(&encoder, p->ver) < 0) return -1; } + if (tEncodeI8(&encoder, pReq->recursiveTsma) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->baseTsmaName) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -967,6 +969,8 @@ int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pR } } } + if (tDecodeI8(&decoder, &pReq->recursiveTsma) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->baseTsmaName) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); return 0; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index cd8c3c9ea5..fec4d958ca 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -464,6 +464,7 @@ typedef struct { char* ast; SSchemaWrapper schemaRow; // for dstVgroup SSchemaWrapper schemaTag; // for dstVgroup + char baseSmaName[TSDB_TABLE_FNAME_LEN]; } SSmaObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 16a64737dd..1a211df4b2 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -70,6 +70,7 @@ typedef struct SCreateTSMACxt { const SDbObj * pDb; SStbObj * pSrcStb; SSmaObj * pSma; + const SSmaObj * pRecursiveSma; SCMCreateStreamReq *pCreateStreamReq; SMDropStreamReq * pDropStreamReq; const char * streamName; @@ -152,6 +153,7 @@ static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { if (pSma->astLen > 0) { SDB_SET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER) } + SDB_SET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) @@ -235,6 +237,7 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) { if (pSma->ast == NULL) goto _OVER; SDB_GET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER) } + SDB_GET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER) SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER) @@ -1390,6 +1393,7 @@ static void initSMAObj(SCreateTSMACxt* pCxt) { memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN); memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN); memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN); + if (pCxt->pRecursiveSma) memcpy(pCxt->pSma->baseSmaName, pCxt->pRecursiveSma->name, TSDB_TABLE_FNAME_LEN); pCxt->pSma->createdTime = taosGetTimestampMs(); pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN); @@ -1447,9 +1451,9 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) { pCxt->pCreateStreamReq->targetStbUid = 0; pCxt->pCreateStreamReq->fillNullCols = NULL; pCxt->pCreateStreamReq->igUpdate = 0; - // TODO what's this timestamp - //pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs; - pCxt->pCreateStreamReq->lastTs = 1758414148000; + pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs; + mDebug("tsma create stream with last ts: %" PRId64 "vgversion size: %d", pCxt->pCreateSmaReq->lastTs, + pCxt->pCreateStreamReq->pVgroupVerList ? pCxt->pCreateStreamReq->pVgroupVerList->size : 0); pCxt->pCreateStreamReq->ast = strdup(pCxt->pCreateSmaReq->ast); pCxt->pCreateStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql); @@ -1601,6 +1605,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { SDbObj * pDb = NULL; SStbObj * pStb = NULL; SSmaObj * pSma = NULL; + SSmaObj * pRecursiveTsma = NULL; SStreamObj * pStream = NULL; int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId); SMCreateSmaReq createReq = {0}; @@ -1664,6 +1669,15 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { goto _OVER; } + if (createReq.recursiveTsma) { + pRecursiveTsma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.baseTsmaName); + if (!pRecursiveTsma) { + mError("base tsma: %s not found when creating recursive tsma", createReq.baseTsmaName); + terrno = TSDB_CODE_MND_SMA_NOT_EXIST; + goto _OVER; + } + } + SCreateTSMACxt cxt = { .pMnode = pMnode, .pCreateSmaReq = &createReq, @@ -1673,6 +1687,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { .pDb = pDb, .pRpcReq = pReq, .pSma = NULL, + .pRecursiveSma = pRecursiveTsma, .pSrcStb = pStb, }; @@ -1685,6 +1700,7 @@ _OVER: } if (pStb) mndReleaseStb(pMnode, pStb); + if (pRecursiveTsma) mndReleaseSma(pMnode, pRecursiveTsma); mndReleaseSma(pMnode, pSma); mndReleaseStream(pMnode, pStream); mndReleaseDb(pMnode, pDb); @@ -1759,6 +1775,22 @@ _OVER: return code; } +static bool hasRecursiveTsmasBasedOnMe(SMnode* pMnode, const SSmaObj* pSma) { + SSmaObj *pSmaObj = NULL; + void * pIter = NULL; + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSmaObj); + if (pIter == NULL) break; + if (0 == strncmp(pSmaObj->baseSmaName, pSma->name, TSDB_TABLE_FNAME_LEN)) { + sdbRelease(pMnode->pSdb, pSmaObj); + sdbCancelFetch(pMnode->pSdb, pIter); + return true; + } + sdbRelease(pMnode->pSdb, pSmaObj); + } + return false; +} + static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) { int32_t code = -1; SMDropSmaReq dropReq = {0}; @@ -1795,6 +1827,11 @@ static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) { goto _OVER; } + if (hasRecursiveTsmasBasedOnMe(pMnode, pSma)) { + terrno = TSDB_CODE_MND_INVALID_DROP_TSMA; + goto _OVER; + } + SCreateTSMACxt cxt = { .pDb = pDb, .pMnode = pMnode, @@ -1934,7 +1971,7 @@ static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) { taosMemoryFree(p); } -int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STableTSMAInfo* pInfo) { +int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STableTSMAInfo* pInfo, const SSmaObj* pBaseTsma) { int32_t code = 0; pInfo->interval = pSma->interval; pInfo->unit = pSma->intervalUnit; @@ -1955,7 +1992,7 @@ int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STa if (!pInfo->pFuncs) return TSDB_CODE_OUT_OF_MEMORY; SNode *pNode, *pFunc; - if (TSDB_CODE_SUCCESS != nodesStringToNode(pSma->ast, &pNode)) { + if (TSDB_CODE_SUCCESS != nodesStringToNode(pBaseTsma ? pBaseTsma->ast : pSma->ast, &pNode)) { taosArrayDestroy(pInfo->pFuncs); pInfo->pFuncs = NULL; return TSDB_CODE_TSMA_INVALID_STAT; @@ -2004,9 +2041,37 @@ int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STa return code; } +// @note remember to mndReleaseSma(*ppOut) +static int32_t mndGetDeepestBaseForTsma(SMnode* pMnode, SSmaObj* pSma, SSmaObj** ppOut) { + int32_t code = 0; + SSmaObj* pRecursiveTsma = NULL; + if (pSma->baseSmaName[0]) { + pRecursiveTsma = mndAcquireSma(pMnode, pSma->baseSmaName); + if (!pRecursiveTsma) { + mError("base tsma: %s for tsma: %s not found", pSma->baseSmaName, pSma->name); + return TSDB_CODE_MND_SMA_NOT_EXIST; + } + while (pRecursiveTsma->baseSmaName[0]) { + // TODO test 2 level recursive tsma + SSmaObj* pTmpSma = pRecursiveTsma; + pRecursiveTsma = mndAcquireSma(pMnode, pTmpSma->baseSmaName); + if (!pRecursiveTsma) { + mError("base tsma: %s for tsma: %s not found", pTmpSma->baseSmaName, pTmpSma->name); + mndReleaseSma(pMnode, pTmpSma); + return TSDB_CODE_MND_SMA_NOT_EXIST; + } + mndReleaseSma(pMnode, pTmpSma); + } + } + *ppOut = pRecursiveTsma; + return code; +} + + static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) { int32_t code = -1; SSmaObj *pSma = NULL; + SSmaObj *pBaseTsma = NULL; SStbObj *pDstStb = NULL; pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, tsmaFName); @@ -2025,10 +2090,17 @@ static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rs return code; } - terrno = dumpTSMAInfoFromSmaObj(pSma, pDstStb, pTsma); + terrno = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma); + if (terrno == 0) { + terrno = dumpTSMAInfoFromSmaObj(pSma, pDstStb, pTsma, pBaseTsma); + } mndReleaseStb(pMnode, pDstStb); sdbRelease(pMnode->pSdb, pSma); - if (terrno) return code; + if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma); + if (terrno) { + tFreeTableTSMAInfo(pTsma); + return code; + } if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) { terrno = TSDB_CODE_OUT_OF_MEMORY; tFreeTableTSMAInfo(pTsma); @@ -2041,6 +2113,7 @@ static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rs static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *rsp, bool *exist) { int32_t code = -1; SSmaObj * pSma = NULL; + SSmaObj * pBaseTsma = NULL; SSdb * pSdb = pMnode->pSdb; void * pIter = NULL; SStreamObj * pStreamObj = NULL; @@ -2088,11 +2161,19 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp sdbRelease(pSdb, pSma); return code; } - terrno = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma); pTsma->streamUid = streamId; + + terrno = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma); + if (terrno == 0) { + terrno = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma, pBaseTsma); + } mndReleaseStb(pMnode, pStb); sdbRelease(pSdb, pSma); - if (terrno) return code; + if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma); + if (terrno) { + tFreeTableTSMAInfo(pTsma); + return code; + } if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) { terrno = TSDB_CODE_OUT_OF_MEMORY; tFreeTableTSMAInfo(pTsma); @@ -2153,7 +2234,8 @@ static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) { _OVER: if (code != 0) { - mError("failed to get table tsma %s since %s", tsmaReq.name, terrstr()); + mError("failed to get table tsma %s since %s fetching with tsma name %d", tsmaReq.name, terrstr(), + tsmaReq.fetchingWithTsmaName); } tFreeTableTSMAInfoRsp(&rsp); @@ -2227,16 +2309,26 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t // dump smaObj into rsp STableTSMAInfo * pInfo = NULL; pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo)); - if (!pInfo || (terrno = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo))) { + if (!pInfo) { + terrno = TSDB_CODE_OUT_OF_MEMORY; mndReleaseSma(pMnode, pSma); mndReleaseStb(pMnode, pDestStb); - taosMemoryFreeClear(pInfo); + goto _OVER; + } + + SSmaObj* pBaseSma = NULL; + terrno = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseSma); + if (terrno == 0) terrno = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo, pBaseSma); + + mndReleaseStb(pMnode, pDestStb); + mndReleaseSma(pMnode, pSma); + if (pBaseSma) mndReleaseSma(pMnode, pBaseSma); + if (terrno) { + tFreeTableTSMAInfo(pInfo); goto _OVER; } taosArrayPush(hbRsp.pTsmas, pInfo); - mndReleaseStb(pMnode, pDestStb); - mndReleaseSma(pMnode, pSma); } rspLen = tSerializeTSMAHbRsp(NULL, 0, &hbRsp); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index f9a41986a0..af31d03698 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -563,8 +563,9 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b } } // TODO remove log - tqInfo("------ver: %" PRId64 " fhFinished: %d max: %" PRId64 " cur: %" PRId64 " latest: %" PRId64, ver, *fhFinished, - verRange.maxVer, cur, latest); + tqInfo("------ver: %" PRId64 " fhFinished: %d max: %" PRId64 " cur: %" PRId64 " latest: %" PRId64 + "lastst-cur %" PRId64, + ver, *fhFinished, verRange.maxVer, cur, latest, latest - cur); if (pDelay != NULL) { // delay in ms *pDelay = (latest - cur) / 1000; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 819ba6362a..f66104ccdf 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -972,7 +972,7 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation* operation); int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char* dbFName, SCtgDBCache** pCache); void ctgReleaseDBCache(SCatalog* pCtg, SCtgDBCache* dbCache); void ctgRUnlockVgInfo(SCtgDBCache* dbCache); -int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char* dbFName, char* tbName, int32_t* exist); +int32_t ctgTbMetaExistInCache(SCatalog* pCtg, const char* dbFName, const char* tbName, int32_t* exist); int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta); int32_t ctgReadTbVerFromCache(SCatalog* pCtg, SName* pTableName, int32_t* sver, int32_t* tver, int32_t* tbType, uint64_t* suid, char* stbName); @@ -1033,7 +1033,7 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const ch SCtgTask* pTask); int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, SGetUserAuthRsp* out, SCtgTask* pTask); -int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName, +int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, const char* tbName, STableMetaOutput* out, SCtgTaskReq* tReq); int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMetaOutput* out, SCtgTaskReq* tReq); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index d1bb1f1343..21da6857e6 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -2707,8 +2707,22 @@ int32_t ctgLaunchGetTSMATask(SCtgTask* pTask) { taosArrayPush(pCtx->pResList, &(SMetaRes){0}); CTG_ERR_RET(ctgGetTbTSMAFromMnode(pCtg, pConn, pTsmaName, NULL, &tReq, TDMT_MND_GET_TSMA)); } else { + SMetaRes* pRes = taosArrayGet(pCtx->pResList, 0); + STableTSMAInfoRsp* pRsp = (STableTSMAInfoRsp*)pRes->pRes; + ASSERT(pRsp->pTsmas->size == 1); + const STSMACache* pTsma = taosArrayGetP(pRsp->pTsmas, 0); TSWAP(pTask->res, pCtx->pResList); - CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0)); + // get tsma target stable meta if not existed in cache + int32_t exists = false; + CTG_ERR_RET(ctgTbMetaExistInCache(pCtg, pTsma->targetDbFName, pTsma->targetTb, &exists)); + if (!exists) { + SCtgTaskReq tReq = {.pTask = pTask, .msgIdx = 0}; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, 0); + if (!pMsgCtx->pBatchs) pMsgCtx->pBatchs = pJob->pBatchs; + CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pTsma->targetDbFName, pTsma->targetTb, NULL, &tReq)); + } else { + CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0)); + } return TSDB_CODE_SUCCESS; } @@ -2742,23 +2756,13 @@ int32_t ctgHandleGetTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pRes->pRes = pOut; pMsgCtx->out = NULL; TSWAP(pTask->res, pCtx->pResList); - break; + STableTSMAInfo* pTsma = taosArrayGetP(pOut->pTsmas, 0); - - SName dstTbName = *pName; - strcpy(dstTbName.tname, pTsma->targetTb); - - SCtgTbMetaCtx stbCtx = {0}; - stbCtx.flag = CTG_FLAG_STB; - stbCtx.pName = &dstTbName; - STableMeta* pDstTbMeta = NULL; - (void)ctgReadTbMetaFromCache(pCtg, &stbCtx, &pDstTbMeta); - - if (!pDstTbMeta) { + int32_t exists = false; + CTG_ERR_JRET(ctgTbMetaExistInCache(pCtg, pTsma->targetDbFName, pTsma->targetTb, &exists)); + if (!exists) { TSWAP(pMsgCtx->lastOut, pMsgCtx->out); - CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pTsma->targetDbFName, dstTbName.tname, NULL, tReq)); - } else { - taosMemoryFreeClear(pDstTbMeta); + CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pTsma->targetDbFName, pTsma->targetTb, NULL, tReq)); } } } break; @@ -2781,7 +2785,6 @@ _return: int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { bool taskDone = false; - bool hasSubFetch = false; int32_t code = 0; SCtgTask* pTask = tReq->pTask; SCatalog* pCtg = pTask->pJob->pCtg; @@ -2830,7 +2833,6 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf .vgId = pVgInfo->vgId}; CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req)); pFetch->subFetchNum++; - hasSubFetch = true; pVgInfo = taosHashIterate(pDbCache->vgCache.vgInfo->vgHash, pVgInfo); } } @@ -2893,7 +2895,6 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf .vgId = pVgInfo->vgId}; CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req)); pFetch->subFetchNum++; - hasSubFetch = true; pVgInfo = taosHashIterate(pVgHash, pVgInfo); } } @@ -2916,7 +2917,6 @@ _return: if (code) { SMetaRes* pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx); pRes->code = code; - pRes->pRes = NULL; if (TSDB_CODE_MND_SMA_NOT_EXIST == code) { code = TSDB_CODE_SUCCESS; } else { @@ -2926,10 +2926,10 @@ _return: tstrerror(code)); } bool allSubFetchFinished = false; - if (reqType == TDMT_VND_GET_STREAM_PROGRESS) { + if (pMsgCtx->reqType == TDMT_VND_GET_STREAM_PROGRESS) { allSubFetchFinished = atomic_add_fetch_32(&pFetch->finishedSubFetchNum, 1) >= pFetch->subFetchNum; } - if ((allSubFetchFinished || !hasSubFetch) && 0 == atomic_sub_fetch_32(&pCtx->fetchNum, 1)) { + if ((allSubFetchFinished || pFetch->subFetchNum == 0) && 0 == atomic_sub_fetch_32(&pCtx->fetchNum, 1)) { TSWAP(pTask->res, pCtx->pResList); taskDone = true; } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index a33a935e75..768320ca2c 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -259,7 +259,7 @@ _return: return TSDB_CODE_SUCCESS; } -int32_t ctgAcquireTbMetaFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) { +int32_t ctgAcquireTbMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) { SCtgDBCache *dbCache = NULL; SCtgTbCache *pCache = NULL; ctgAcquireDBCache(pCtg, dbFName, &dbCache); @@ -503,7 +503,7 @@ _return: return TSDB_CODE_SUCCESS; } -int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *exist) { +int32_t ctgTbMetaExistInCache(SCatalog *pCtg, const char *dbFName, const char *tbName, int32_t *exist) { SCtgDBCache *dbCache = NULL; SCtgTbCache *tbCache = NULL; ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 6dd38cac48..8454382880 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -1143,7 +1143,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const return TSDB_CODE_SUCCESS; } -int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName, +int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, const char* tbName, STableMetaOutput* out, SCtgTaskReq* tReq) { SCtgTask* pTask = tReq ? tReq->pTask : NULL; SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName}; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ababa5419f..f6b9669004 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -321,7 +321,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal); static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode** pFunc); static int32_t createSimpleSelectStmtFromProjList(const char* pDb, const char* pTable, SNodeList* pProjectionList, SSelectStmt** pStmt); -static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta, SNode** pQuery); +static int32_t createLastTsSelectStmt(char* pDb, const char* pTable, const char* pkColName, SNode** pQuery); static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery); static int32_t setRefreshMeta(STranslateContext* pCxt, SQuery* pQuery); @@ -8326,7 +8326,7 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pMetaCache); if (TSDB_CODE_SUCCESS == code) { pStmt->pOptions->tsPrecision = pMetaCache->tableInfo.precision; - code = createLastTsSelectStmt(pStmt->dbName, pStmt->tableName, pMetaCache, &pStmt->pPrevQuery); + code = createLastTsSelectStmt(pStmt->dbName, pStmt->tableName, pMetaCache->schema[0].name, &pStmt->pPrevQuery); } taosMemoryFreeClear(pMetaCache); } @@ -9622,14 +9622,14 @@ static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStream return code; } -static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta, SNode** pQuery) { +static int32_t createLastTsSelectStmt(char* pDb, const char* pTable, const char* pkColName, SNode** pQuery) { SColumnNode* col = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == col) { return TSDB_CODE_OUT_OF_MEMORY; } tstrncpy(col->tableAlias, pTable, tListLen(col->tableAlias)); - tstrncpy(col->colName, pMeta->schema[0].name, tListLen(col->colName)); + tstrncpy(col->colName, pkColName, tListLen(col->colName)); SNodeList* pParameterList = nodesMakeList(); if (NULL == pParameterList) { nodesDestroyNode((SNode*)col); @@ -9812,7 +9812,8 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt } if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->fillHistory) { SRealTableNode* pTable = (SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable); - code = createLastTsSelectStmt(pTable->table.dbName, pTable->table.tableName, pTable->pMeta, &pStmt->pPrevQuery); + code = createLastTsSelectStmt(pTable->table.dbName, pTable->table.tableName, pTable->pMeta->schema[0].name, + &pStmt->pPrevQuery); /* if (TSDB_CODE_SUCCESS == code) { STranslateContext cxt = {0}; @@ -10624,6 +10625,8 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm // useTbName is base tsma name code = getTsma(pCxt, useTbName, &pRecursiveTsma); if (code == TSDB_CODE_SUCCESS) { + pReq->recursiveTsma = true; + tNameExtractFullName(useTbName, pReq->baseTsmaName); SValueNode* pInterval = (SValueNode*)pStmt->pOptions->pInterval; if (pRecursiveTsma->interval < pInterval->datum.i && pInterval->datum.i % pRecursiveTsma->interval == 0) { } else { @@ -10649,6 +10652,7 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm numOfTags = pRecursiveTsma->pTags->size; pCols = pRecursiveTsma->pUsedCols->pData; pTags = pRecursiveTsma->pTags->pData; + code = getTableMeta(pCxt, pStmt->dbName, pRecursiveTsma->targetTb, &pTableMeta); } } else { code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta); @@ -10679,11 +10683,10 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm code = buildTSMAAst(pCxt, pStmt, pReq, pStmt->pOptions->recursiveTsma ? pRecursiveTsma->targetTb : pStmt->tableName, numOfTags, pTags); } - if (TSDB_CODE_SUCCESS == code && !pStmt->pOptions->recursiveTsma) { //TODO remvoe recursive tsma check - if (TSDB_CODE_SUCCESS == code) { - pStmt->pOptions->tsPrecision = pTableMeta->tableInfo.precision; - code = createLastTsSelectStmt(pStmt->dbName, pStmt->tableName, pTableMeta, &pStmt->pPrevQuery); - } + if (TSDB_CODE_SUCCESS == code) { + const char* pkColName = pTableMeta->schema[0].name; + const char* tbName = pStmt->pOptions->recursiveTsma ? pRecursiveTsma->targetTb : pStmt->tableName; + code = createLastTsSelectStmt(pStmt->dbName, tbName, pkColName, &pStmt->pPrevQuery); } taosMemoryFreeClear(pTableMeta); diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index fe0010859a..7a287041e3 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -1189,6 +1189,8 @@ int32_t getTsmaFromCache(SParseMetaCache* pMetaCache, const SName* pTsmaName, ST if (TSDB_CODE_SUCCESS == code && pTsmaRsp) { ASSERT(pTsmaRsp->pTsmas->size == 1); *pTsma = taosArrayGetP(pTsmaRsp->pTsmas, 0); + } else if (code == TSDB_CODE_PAR_INTERNAL_ERROR){ + code = TSDB_CODE_MND_SMA_NOT_EXIST; } return code; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 068ec94268..d45b8a347f 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -5937,7 +5937,11 @@ static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) { } STableTSMAInfo* pTsma = taosArrayGetP(pTsmaOptCtx->pTsmas, i); - if (!pTsma->fillHistoryFinished || 30 * 1000 < (pTsma->rspTs - pTsma->reqTs) + pTsma->delayDuration) continue; + if (!pTsma->fillHistoryFinished || 30 * 1000 < (pTsma->rspTs - pTsma->reqTs) + pTsma->delayDuration) { + qInfo("tsma %s filtered out history: %d rspTs: %ld reqTs: %ld delay: %ld, rspTs - reqTs: %ld", pTsma->name, + pTsma->fillHistoryFinished, pTsma->rspTs, pTsma->reqTs, pTsma->delayDuration, pTsma->rspTs - pTsma->reqTs); + continue; + } // filter with interval // TODO unit not right if (!tsmaOptCheckValidInterval(pTsma->interval, pTsma->unit, pTsmaOptCtx)) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 16238d335a..34d0670cd4 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -325,6 +325,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TARGET_TABLE, "Cannot write the same TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists in db") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "sma not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SMA_OPTION, "Invalid sma option") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DROP_TSMA, "Invalid drop base tsma, drop recursive tsma first") + // mnode-view TAOS_DEFINE_ERROR(TSDB_CODE_MND_VIEW_ALREADY_EXIST, "view already exists in db") diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index b5e0db646b..2f749c1e79 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -1,7 +1,4 @@ -from os import name from random import randrange -from socket import TIPC_ADDR_NAMESEQ -import taos import time import threading @@ -50,7 +47,7 @@ class UsedTsma: def __repr__(self) -> str: return self.__str__() - + def setIsTsma(self): self.is_tsma_ = self.name.endswith(self.TSMA_RES_STB_POSTFIX) @@ -92,7 +89,7 @@ class TSMAQCBuilder: def with_sql(self, sql: str): self.qc_.sql = sql return self - + def to_timestamp(self, ts: str) -> float: if ts == UsedTsma.TS_MAX or ts == UsedTsma.TS_MIN: return float(ts) @@ -220,11 +217,11 @@ class TSMATestSQLGenerator: self.where_list_: List[str] = [] self.group_or_partition_by_list: List[str] = [] self.interval: str = '' - + def get_random_type(self, funcs): rand: int = randrange(1, len(funcs)) return funcs[rand-1]() - + def generate_one(self) -> str: pass @@ -245,7 +242,7 @@ class TSMATestSQLGenerator: sql = ' ts ' if len(left) > 0: sql += '%s ' % (left) - + if len(right) > 0: if len(sql) > 0: sql += 'and ts ' @@ -336,7 +333,7 @@ class TDTestCase: tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName)) return - + def init_normal_tb(self, tsql, db_name: str, tb_name: str, rows: int, start_ts: int, ts_step: int): sql = 'CREATE TABLE %s.%s (ts timestamp, c1 INT, c2 INT, c3 VARCHAR(255), c4 INT)' % (db_name, tb_name) tsql.execute(sql) @@ -413,7 +410,7 @@ class TDTestCase: def wait_for_tsma_calculation(self, func_list: list, db: str, tb: str, interval: str, tsma_name: str): while True: sql = 'select %s from %s.%s interval(%s)' % (', '.join(func_list), db, tb, interval) - tdLog.debug('waiting for tsma %s to be useful with sql %s' % (tsma_name, sql)) + tdLog.debug(f'waiting for tsma {db}.{tsma_name} to be useful with sql {sql}') ctx: TSMAQueryContext = self.tsma_tester.get_tsma_query_ctx(sql) if ctx.has_tsma(): if ctx.used_tsmas[0].name == tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX: @@ -430,11 +427,11 @@ class TDTestCase: tdSql.execute(sql, queryTimes=1) self.wait_for_tsma_calculation(func_list, db, tb, interval, tsma_name) - def create_recursive_tsma(self, base_tsma_name: str, new_tsma_name: str, db: str, interval: str, tb_name: str): + def create_recursive_tsma(self, base_tsma_name: str, new_tsma_name: str, db: str, interval: str, tb_name: str, func_list: List[str] = ['avg(c1)']): tdSql.execute('use %s' % db, queryTimes=1) sql = 'CREATE RECURSIVE TSMA %s ON %s.%s INTERVAL(%s)' % (new_tsma_name, db, base_tsma_name, interval) tdSql.execute(sql, queryTimes=1) - self.wait_for_tsma_calculation(['avg(c1)'], db, tb_name, interval, new_tsma_name) + self.wait_for_tsma_calculation(func_list, db, tb_name, interval, new_tsma_name) def drop_tsma(self, tsma_name: str, db: str): sql = 'DROP TSMA %s.%s' % (db, tsma_name) @@ -449,7 +446,7 @@ class TDTestCase: break if not plan_found: tdLog.exit("plan: %s not found in res: [%s]" % (plan_str_expect, str(explain_output))) - + def check(self, func): for ctx in func(): self.tsma_tester.check_sql(ctx.sql, ctx) @@ -457,20 +454,35 @@ class TDTestCase: def test_query_with_tsma(self): self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m') self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m') - #self.create_recursive_tsma('tsma1', 'tsma3', 'test', '20m', 'meters') - #self.create_recursive_tsma('tsma2', 'tsma4', 'test', '1h', 'meters') self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m') - ## why need 10s, filling history not finished yet - #ctx = TSMAQCBuilder().with_sql('select avg(c1) from meters').should_query_with_table('meters', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc() - #self.tsma_tester.check_sql(ctx.sql, ctx) - #time.sleep(5) - #time.sleep(9999999) + self.test_query_with_tsma_interval() self.test_query_with_tsma_agg() + self.test_recursive_tsma() ## self.test_query_with_drop_tsma() ## self.test_query_with_add_tag() ## self.test_union() + def test_recursive_tsma(self): + tdSql.execute('drop tsma tsma2') + func_list: List[str] = ['avg(c2)', 'avg(c3)'] + self.create_tsma('tsma3', 'test', 'meters', func_list, '5m') + self.create_recursive_tsma('tsma3', 'tsma4', 'test', '20m', 'meters', func_list) + ## now we have 5m, 10m, 30m, 1h 4 tsmas + sql = 'select avg(c2), "recursive tsma4" from meters' + ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma4', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc() + self.tsma_tester.check_sql(sql, ctx) + self.create_recursive_tsma('tsma4', 'tsma6', 'test', '1h', 'meters', func_list) + ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma6', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc() + self.tsma_tester.check_sql(sql, ctx) + + tdSql.error('drop tsma tsma3', -2147482491) + tdSql.error('drop tsma tsma4', -2147482491) + tdSql.execute('drop tsma tsma6') + tdSql.execute('drop tsma tsma4') + tdSql.execute('drop tsma tsma3') + self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m') + def test_query_with_tsma_interval(self): self.check(self.test_query_with_tsma_interval_no_partition) self.check(self.test_query_with_tsma_interval_partition_by_col) @@ -493,7 +505,7 @@ class TDTestCase: sql = 'select avg(c1), avg(c2) from meters interval(60m)' ctxs.append(TSMAQCBuilder().with_sql(sql) \ .should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc()) - + sql = "select avg(c1), avg(c2) from meters where ts >= '2018-09-17 09:00:00.009' and ts < '2018-09-17 10:23:19.665' interval(30m)" ctxs.append(TSMAQCBuilder().with_sql(sql) \ .should_query_with_table('meters', '2018-09-17 09:00:00.009','2018-09-17 09:29:59.999') \ @@ -546,7 +558,7 @@ class TDTestCase: .should_query_with_table('meters', '2018-09-17 09:00:00.200','2018-09-17 09:29:59:999') \ .should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \ .should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.800').get_qc()) - + sql = 'select avg(c1) + avg(c2) from meters where tbname like "%t1%"' ctxs.append(TSMAQCBuilder().with_sql(sql) \ .should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc()) @@ -558,7 +570,7 @@ class TDTestCase: sql = 'select avg(c1), avg(c2), spread(c4) from meters' ctxs.append(TSMAQCBuilder().with_sql(sql) \ .should_query_with_table('meters', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc()) - + return ctxs def test_query_with_tsma_agg_group_by_tbname(self): @@ -574,21 +586,51 @@ class TDTestCase: self.init_data() #time.sleep(999999) self.test_create_tsma() - #self.test_drop_tsma() + self.test_drop_tsma() + #time.sleep(9999999) self.test_tb_ddl_with_created_tsma() self.test_query_with_tsma() #time.sleep(999999) - + def test_create_tsma(self): + function_name = sys._getframe().f_code.co_name + tdLog.debug(f'-----{function_name}------') self.test_create_tsma_on_stable() self.test_create_tsma_on_norm_table() self.test_create_tsma_on_child_table() self.test_create_recursive_tsma() ## self.test_drop_stable() ## self.test_drop_ctable() - ## self.test_drop_db() + self.test_drop_db() + + def test_drop_tsma(self): + function_name = sys._getframe().f_code.co_name + tdLog.debug(f'-----{function_name}------') + self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m') + self.create_recursive_tsma('tsma1', 'tsma2', 'test', '15m', 'meters') + + tdSql.error('drop tsma tsma1', -2147482491) ## drop recursive tsma first + tdSql.execute('drop tsma tsma2', queryTimes=1) + tdSql.execute('drop tsma tsma1', queryTimes=1) + tdSql.execute('drop database test', queryTimes=1) + + self.init_data() + + def test_drop_db(self): + function_name = sys._getframe().f_code.co_name + tdLog.debug(f'-----{function_name}------') + tdSql.execute('create database nsdb precision "ns"', queryTimes=1) + tdSql.execute('use nsdb', queryTimes=1) + tdSql.execute('create table meters(ts timestamp, c1 int, c2 int) tags(t1 int, t2 int)', queryTimes=1) + ## TODO insert data + self.create_tsma('tsma1', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '5m') + self.create_recursive_tsma('tsma1', 'tsma2', 'nsdb', '10m', 'meters') + tdSql.query('select avg(c1) from meters', queryTimes=1) + tdSql.execute('drop database nsdb', queryTimes=1) def test_tb_ddl_with_created_tsma(self): + function_name = sys._getframe().f_code.co_name + tdLog.debug(f'-----{function_name}------') tdSql.execute('create database nsdb precision "ns"', queryTimes=1) tdSql.execute('use nsdb', queryTimes=1) tdSql.execute('create table meters(ts timestamp, c1 int, c2 int) tags(t1 int, t2 int)', queryTimes=1) @@ -604,9 +646,11 @@ class TDTestCase: tdSql.execute('alter table meters drop tag t3', queryTimes=1) tdSql.execute('drop database nsdb') - ## test_drop stream + ## TODO test drop stream def test_create_tsma_on_stable(self): + function_name = sys._getframe().f_code.co_name + tdLog.debug(f'-----{function_name}------') tdSql.execute('create database nsdb precision "ns"', queryTimes=1) tdSql.execute('use nsdb', queryTimes=1) tdSql.execute('create table meters(ts timestamp, c1 int, c2 int) tags(t1 int, t2 int)', queryTimes=1) @@ -618,18 +662,27 @@ class TDTestCase: tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(999999b)', -2147471097) tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(999u)', -2147471097) - tdSql.execute('drop tsma tsma1') + tdSql.execute('drop tsma tsma1', queryTimes=1) + tdSql.execute('use test', queryTimes=1) + tdSql.execute('create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2)) interval(10m)', queryTimes=1) + self.wait_for_tsma_calculation(['avg(c1)', 'avg(c2)'], 'nsdb', 'meters', '10m', 'tsma1') + tdSql.execute('drop tsma nsdb.tsma1', queryTimes=1) tdSql.error('create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097) tdSql.execute('drop database nsdb') def test_create_tsma_on_norm_table(self): - pass + function_name = sys._getframe().f_code.co_name + tdLog.debug(f'-----{function_name}------') def test_create_tsma_on_child_table(self): + function_name = sys._getframe().f_code.co_name + tdLog.debug(f'-----{function_name}------') tdSql.error('create tsma tsma1 on test.t1 function(avg(c1), avg(c2)) interval(1m)', -2147471098) ## Invalid table to create tsma, only stable or normal table allowed def test_create_recursive_tsma(self): + function_name = sys._getframe().f_code.co_name + tdLog.debug(f'-----{function_name}------') tdSql.execute('use test') self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m') sql = 'create recursive tsma tsma2 on tsma1 interval(1m)'