diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 071b39ce92..e4e50f69ee 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -110,6 +110,8 @@ typedef struct SScanLogicNode { int8_t igCheckUpdate; SArray* pSmaIndexes; SArray* pTsmas; + SArray* pTsmaTargetCTbVgInfo; + SArray* pTsmaTargetCTbInfo; SNodeList* pGroupTags; bool groupSort; SNodeList* pTags; // for create stream diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 99fff8cdea..3f3d4cf031 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -190,6 +190,11 @@ typedef struct STableNode { struct STableMeta; +typedef struct STsmaTargetCTbInfo { + char ctableName[TSDB_TABLE_NAME_LEN]; + uint64_t uid; +} STsmaTargetCTbInfo; + typedef struct SRealTableNode { STableNode table; // QUERY_NODE_REAL_TABLE struct STableMeta* pMeta; @@ -199,6 +204,8 @@ typedef struct SRealTableNode { SArray* pSmaIndexes; int8_t cacheLastMode; SArray* pTsmas; + SArray* tsmaTargetCTbVgInfo; // SArray + SArray* tsmaTargetCTbInfo; // SArray } SRealTableNode; typedef struct STempTableNode { diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index aa6c9ab3e8..331f9b6cde 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -263,9 +263,9 @@ typedef struct SCtgViewsCtx { } SCtgViewsCtx; typedef enum { - FETCH_TB_META, - FETCH_TSMA_FOR_TB, - FETCH_PROGRESS_FOR_TSMA, + FETCH_TSMA_SOURCE_TB_META, + FETCH_TB_TSMA, + FETCH_TSMA_STREAM_PROGRESS, } CTG_TSMA_FETCH_TYPE; typedef struct SCtgTSMAFetch { @@ -274,9 +274,18 @@ typedef struct SCtgTSMAFetch { int32_t tbIdx; int32_t fetchIdx; int32_t resIdx; - int32_t subFetchNum; - int32_t finishedSubFetchNum; - int32_t vgNum; + + // tb meta + int32_t flag; + int32_t vgId; + + // stream progress + int32_t subFetchNum; + int32_t finishedSubFetchNum; + int32_t vgNum; + + // tb tsma + SName tsmaSourceTbName; } SCtgTSMAFetch; typedef struct SCtgTbTSMACtx { @@ -1150,8 +1159,8 @@ bool isCtgTSMACacheOutOfDate(STSMACache* pTsmaCache); int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTbName, SVgroupInfo* vgroupInfo, SStreamProgressRsp* out, SCtgTaskReq* tReq, void* bInput); -int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx, - int32_t flag); +int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx, int32_t flag, + CTG_TSMA_FETCH_TYPE fetchType, const SName* sourceTbName); extern SCatalogMgmt gCtgMgmt; extern SCtgDebug gCTGDebug; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 529bc7ae65..dba9bebd87 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -2682,7 +2682,19 @@ int32_t ctgLaunchGetTbTSMATask(SCtgTask* pTask) { SCtgTaskReq tReq; tReq.pTask = pTask; tReq.msgIdx = pFetch->fetchIdx; - CTG_ERR_RET(ctgGetTbTSMAFromMnode(pCtg, pConn, pName, NULL, &tReq, TDMT_MND_GET_TABLE_TSMA)); + + switch (pFetch->fetchType) { + case FETCH_TSMA_SOURCE_TB_META: { + CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pFetch->flag, pName, &pFetch->vgId)); + } break; + case FETCH_TB_TSMA: { + CTG_ERR_RET( + ctgGetTbTSMAFromMnode(pCtg, pConn, &pFetch->tsmaSourceTbName, NULL, &tReq, TDMT_MND_GET_TABLE_TSMA)); + } break; + default: + ASSERT(0); + break; + } } return TSDB_CODE_SUCCESS; @@ -2784,6 +2796,38 @@ _return: CTG_RET(code); } +static int32_t ctgTsmaFetchStreamProgress(SCtgTaskReq* tReq, SHashObj* pVgHash, const STableTSMAInfoRsp* pTsmas) { + int32_t code = 0; + SCtgTask* pTask = tReq->pTask; + SCatalog* pCtg = pTask->pJob->pCtg; + int32_t subFetchIdx = 0; + SCtgTbTSMACtx* pCtx = pTask->taskCtx; + SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx); + SRequestConnInfo* pConn = &pTask->pJob->conn; + STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); + const SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->fetchIdx); + SVgroupInfo* pVgInfo = NULL; + + pFetch->vgNum = taosHashGetSize(pVgHash); + for (int32_t i = 0; i < taosArrayGetSize(pTsmas->pTsmas); ++i) { + STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas->pTsmas, i); + pVgInfo = taosHashIterate(pVgHash, NULL); + pTsmaInfo->reqTs = taosGetTimestampMs(); + while (pVgInfo) { + // make StreamProgressReq, send it + SStreamProgressReq req = {.fetchIdx = pFetch->fetchIdx, + .streamId = pTsmaInfo->streamUid, + .subFetchIdx = subFetchIdx++, + .vgId = pVgInfo->vgId}; + CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req)); + pFetch->subFetchNum++; + pVgInfo = taosHashIterate(pVgHash, pVgInfo); + } + } +_return: + CTG_RET(code); +} + int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { bool taskDone = false; int32_t code = 0; @@ -2798,19 +2842,20 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf SCtgDBCache* pDbCache = NULL; STableTSMAInfo* pTsma = NULL; SRequestConnInfo* pConn = &pTask->pJob->conn; + STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); + SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->fetchIdx); CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); switch (reqType) { case TDMT_MND_GET_TABLE_TSMA: { STableTSMAInfoRsp* pOut = pMsgCtx->out; + pFetch->fetchType = FETCH_TSMA_STREAM_PROGRESS; pRes->pRes = pOut; pMsgCtx->out = NULL; if (pOut->pTsmas && taosArrayGetSize(pOut->pTsmas) > 0) { // fetch progress - STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); - const SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->fetchIdx); ctgAcquireVgInfoFromCache(pCtg, pTbReq->dbFName, &pDbCache); if (!pDbCache) { // do not know which vnodes to fetch, fetch vnode list first @@ -2820,23 +2865,7 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, tReq)); } else { // fetch progress from every vnode - int32_t subFetchIdx = 0; - pFetch->vgNum = taosHashGetSize(pDbCache->vgCache.vgInfo->vgHash); - for (int32_t i = 0; i < taosArrayGetSize(pOut->pTsmas); ++i) { - STableTSMAInfo* pTsmaInfo = taosArrayGetP(pOut->pTsmas, i); - pTsmaInfo->reqTs = taosGetTimestampMs(); - SVgroupInfo* pVgInfo = taosHashIterate(pDbCache->vgCache.vgInfo->vgHash, NULL); - while (pVgInfo) { - // make StreamProgressReq, send it - SStreamProgressReq req = {.fetchIdx = pFetch->fetchIdx, - .streamId = pTsmaInfo->streamUid, - .subFetchIdx = subFetchIdx++, - .vgId = pVgInfo->vgId}; - CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req)); - pFetch->subFetchNum++; - pVgInfo = taosHashIterate(pDbCache->vgCache.vgInfo->vgHash, pVgInfo); - } - } + CTG_ERR_JRET(ctgTsmaFetchStreamProgress(tReq, pDbCache->vgCache.vgInfo->vgHash, pOut)); ctgReleaseVgInfoToCache(pCtg, pDbCache); pDbCache = NULL; } @@ -2878,28 +2907,43 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf } } break; case TDMT_MND_USE_DB: { - STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); - SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->fetchIdx); - SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out; - STableTSMAInfoRsp* pTsmas = pRes->pRes; - int32_t subFetchIdx = 0; - pFetch->vgNum = taosHashGetSize(pOut->dbVgroup->vgHash); - TSWAP(pOut->dbVgroup->vgHash, pVgHash); - for (int32_t i = 0; i < taosArrayGetSize(pTsmas->pTsmas); ++i) { - STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas->pTsmas, i); - SVgroupInfo* pVgInfo = taosHashIterate(pVgHash, NULL); - while (pVgInfo) { - // make StreamProgressReq, send it - SStreamProgressReq req = {.fetchIdx = pFetch->fetchIdx, - .streamId = pTsmaInfo->streamUid, - .subFetchIdx = subFetchIdx++, - .vgId = pVgInfo->vgId}; - CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req)); - pFetch->subFetchNum++; - pVgInfo = taosHashIterate(pVgHash, pVgInfo); - } + SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out; + + switch (pFetch->fetchType) { + case FETCH_TSMA_SOURCE_TB_META: { + SVgroupInfo vgInfo = {0}; + CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, &pConn->mgmtEps, pOut->dbVgroup, pTbName, &vgInfo)); + + pFetch->vgId = vgInfo.vgId; + CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pTbName, &vgInfo, NULL, tReq)); + } break; + case FETCH_TSMA_STREAM_PROGRESS: { + STableTSMAInfoRsp* pTsmas = pRes->pRes; + TSWAP(pOut->dbVgroup->vgHash, pVgHash); + CTG_ERR_JRET(ctgTsmaFetchStreamProgress(tReq, pVgHash, pTsmas)); + } break; + default: + ASSERT(0); } } break; + case TDMT_VND_TABLE_META: { + // handle source tb meta + ASSERT(pFetch->fetchType == FETCH_TSMA_SOURCE_TB_META); + STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; + pFetch->fetchType = FETCH_TB_TSMA; + pFetch->tsmaSourceTbName = *pTbName; + if (CTG_IS_META_NULL(pOut->metaType)) { + ctgTaskError("no tbmeta found when fetching tsma source tb meta: %s.%s", pTbName->dbname, pTbName->tname); + ctgRemoveTbMetaFromCache(pCtg, pTbName, false); + CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); + } + + if (META_TYPE_BOTH_TABLE == pOut->metaType) { + // rewrite tsma fetch table with it's super table name + snprintf(pFetch->tsmaSourceTbName.tname, TMIN(TSDB_TABLE_NAME_LEN, strlen(pOut->tbName) + 1), "%s", pOut->tbName); + } + CTG_ERR_JRET(ctgGetTbTSMAFromMnode(pCtg, pConn, &pFetch->tsmaSourceTbName, NULL, tReq, TDMT_MND_GET_TABLE_TSMA)); + } break; default: ASSERT(0); } @@ -2921,9 +2965,7 @@ _return: if (TSDB_CODE_MND_SMA_NOT_EXIST == code) { code = TSDB_CODE_SUCCESS; } else { - STablesReq* pReq = (STablesReq*)taosArrayGet(pCtx->pNames, pFetch->dbIdx); - SName* pName = taosArrayGet(pReq->pTables, pFetch->tbIdx); - ctgTaskError("Get tsma for %d.%s.%s faield with err: %s", pName->acctId, pName->dbname, pName->tname, + ctgTaskError("Get tsma for %d.%s.%s faield with err: %s", pTbName->acctId, pTbName->dbname, pTbName->tname, tstrerror(code)); } bool allSubFetchFinished = false; diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 73097c0d8a..d032bf241f 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -3232,9 +3232,10 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx char dbFName[TSDB_DB_FNAME_LEN] = {0}; int32_t flag = CTG_FLAG_UNKNOWN_STB; uint64_t lastSuid = 0; - STableMeta * lastTableMeta = NULL; + STableMeta * pTableMeta = NULL; SName * pName = taosArrayGet(pList, 0); int32_t tbNum = taosArrayGetSize(pList); + SCtgTbCache * pTbCache = NULL; // TODO test sys db if (IS_SYS_DBNAME(pName->dbname)) { @@ -3242,23 +3243,53 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx } tNameGetFullDbName(pName, dbFName); + // get db cache CTG_ERR_RET(ctgAcquireDBCache(pCtg, dbFName, &dbCache)); if (!dbCache) { ctgDebug("DB %s not in cache", dbFName); // TODO test no db cache, select from another db for (int32_t i = 0; i < tbNum; ++i) { - ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag); + ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TSMA_SOURCE_TB_META, NULL); taosArrayPush(pCtx->pResList, &(SMetaData){0}); } return TSDB_CODE_SUCCESS; } for (int32_t i = 0; i < tbNum; ++i) { + // get tb cache pName = taosArrayGet(pList, i); - pCache = taosHashAcquire(dbCache->tsmaCache, pName->tname, strlen(pName->tname)); + pTbCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname)); + if (!pTbCache) { + ctgDebug("tb: %s.%s not in cache", dbFName, pName->tname); + ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TSMA_SOURCE_TB_META, NULL); + taosArrayPush(pCtx->pResList, &(SMetaRes){0}); + continue; + } + uint64_t suid = pTbCache->pMeta->suid; + int8_t tbType = pTbCache->pMeta->tableType; + taosHashRelease(dbCache->tbCache, pTbCache); + SName tsmaSourceTbName = *pName; + + // if child table, get stable name + if (tbType == TSDB_CHILD_TABLE) { + char* stbName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(uint64_t)); + if (stbName) { + snprintf(tsmaSourceTbName.tname, TMIN(TSDB_TABLE_NAME_LEN, strlen(stbName) + 1), "%s", stbName); + taosHashRelease(dbCache->stbCache, stbName); + } else { + ctgDebug("stb in db: %s, uid: %" PRId64 " not in cache", dbFName, suid); + // TODO remove flag + ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TSMA_SOURCE_TB_META, NULL); + taosArrayPush(pCtx->pResList, &(SMetaRes){0}); + continue; + } + } + + // get tsma cache + pCache = taosHashAcquire(dbCache->tsmaCache, tsmaSourceTbName.tname, strlen(tsmaSourceTbName.tname)); if (!pCache) { - ctgDebug("tsma for tb: %s.%s not in cache", dbFName, pName->tname); - ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag); + ctgDebug("tsma for tb: %s.%s not in cache", dbFName, tsmaSourceTbName.tname); + ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TB_TSMA, &tsmaSourceTbName); taosArrayPush(pCtx->pResList, &(SMetaRes){0}); CTG_CACHE_NHIT_INC(CTG_CI_TBL_SMA, 1); continue; @@ -3268,8 +3299,8 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx if (!pCache->pTsmas || pCache->pTsmas->size == 0 || hasOutOfDateTSMACache(pCache->pTsmas)) { CTG_UNLOCK(CTG_READ, &pCache->tsmaLock); taosHashRelease(dbCache->tsmaCache, pCache); - ctgDebug("tsma for tb: %s.%s not in cache", pName->tname, dbFName); - ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag); + ctgDebug("tsma for tb: %s.%s not in cache", tsmaSourceTbName.tname, dbFName); + ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TB_TSMA, &tsmaSourceTbName); taosArrayPush(pCtx->pResList, &(SMetaRes){0}); CTG_CACHE_NHIT_INC(CTG_CI_TBL_TSMA, 1); continue; diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 8454382880..09bbb91eb0 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -557,6 +557,11 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); pName = ctgGetFetchName(ctx->pNames, fetch); + } else if (CTG_TASK_GET_TB_TSMA == pTask->type){ + SCtgTbTSMACtx* pCtx = pTask->taskCtx; + SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx); + STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); + pName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx); } else { SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; pName = ctx->pName; @@ -612,6 +617,11 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); pName = ctgGetFetchName(ctx->pNames, fetch); + } else if (CTG_TASK_GET_TB_TSMA == pTask->type){ + SCtgTbTSMACtx* pCtx = pTask->taskCtx; + SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx); + STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); + pName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx); } else { SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; pName = ctx->pName; @@ -1556,6 +1566,7 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa return TSDB_CODE_SUCCESS; } +// TODO test errors int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTbName, SVgroupInfo* vgroupInfo, SStreamProgressRsp* out, SCtgTaskReq* tReq, void* bInput) { diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index ac19a7dc9a..99193f1f14 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -2420,6 +2420,7 @@ bool isCtgTSMACacheOutOfDate(STSMACache* pTsmaCache) { pTsmaCache->dbFName, pTsmaCache->name, pTsmaCache->fillHistoryFinished, 30 * 1000 - pTsmaCache->delayDuration, now - pTsmaCache->reqTs); } else { + // TODO remove log qDebug("tsma %s.%s in cache has been out of date, history finished: %d, remain valid after: %" PRId64 " passed: %" PRId64, pTsmaCache->dbFName, pTsmaCache->name, pTsmaCache->fillHistoryFinished, @@ -2428,8 +2429,8 @@ bool isCtgTSMACacheOutOfDate(STSMACache* pTsmaCache) { return ret; } -int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx, - int32_t flag) { +int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx, int32_t flag, + CTG_TSMA_FETCH_TYPE fetchType, const SName* sourceTbName) { if (NULL == (*pFetchs)) { *pFetchs = taosArrayInit(CTG_DEFAULT_FETCH_NUM, sizeof(SCtgTSMAFetch)); } @@ -2440,6 +2441,10 @@ int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetch.fetchIdx = (*fetchIdx)++; fetch.resIdx = resIdx; + fetch.flag = flag; + fetch.fetchType = fetchType; + if (sourceTbName) fetch.tsmaSourceTbName = *sourceTbName; + taosArrayPush(*pFetchs, &fetch); return TSDB_CODE_SUCCESS; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 011d4b3c3f..e7d5313b4c 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -816,6 +816,8 @@ void nodesDestroyNode(SNode* pNode) { taosMemoryFreeClear(pReal->pMeta); taosMemoryFreeClear(pReal->pVgroupList); taosArrayDestroyEx(pReal->pSmaIndexes, destroySmaIndex); + taosArrayDestroyP(pReal->tsmaTargetCTbVgInfo, taosMemoryFree); + taosArrayDestroy(pReal->tsmaTargetCTbInfo); break; } case QUERY_NODE_TEMP_TABLE: @@ -1308,6 +1310,8 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyList(pLogicNode->pTags); nodesDestroyNode(pLogicNode->pSubtable); taosArrayDestroyEx(pLogicNode->pFuncTypes, destroyFuncParam); + taosArrayDestroyP(pLogicNode->pTsmaTargetCTbVgInfo, taosMemoryFree); + taosArrayDestroy(pLogicNode->pTsmaTargetCTbInfo); break; } case QUERY_NODE_LOGIC_PLAN_JOIN: { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f6b9669004..6597c1cd4b 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3656,13 +3656,76 @@ static int32_t setTableIndex(STranslateContext* pCxt, SName* pName, SRealTableNo } static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) { + int32_t code = 0; if (pCxt->createStream || QUERY_SMA_OPTIMIZE_DISABLE == tsQuerySmaOptimize) { return TSDB_CODE_SUCCESS; } - if (isSelectStmt(pCxt->pCurrStmt)) { - return getTableTsmas(pCxt, pName, &pRealTable->pTsmas); + if (isSelectStmt(pCxt->pCurrStmt) && pRealTable->pMeta->tableType != TSDB_SYSTEM_TABLE) { + code = getTableTsmas(pCxt, pName, &pRealTable->pTsmas); + // if select from a child table, fetch it's corresponding tsma target child table infos + if (TSDB_CODE_SUCCESS == code && pRealTable->pTsmas && pRealTable->pMeta->tableType == TSDB_CHILD_TABLE) { + if (pRealTable->tsmaTargetCTbVgInfo) { + taosArrayDestroyP(pRealTable->tsmaTargetCTbVgInfo, taosMemoryFree); + pRealTable->tsmaTargetCTbVgInfo = NULL; + } + for (int32_t i = 0; i < pRealTable->pTsmas->size; ++i) { + STableTSMAInfo* pTsma = taosArrayGetP(pRealTable->pTsmas, i); + SName tsmaTargetCTbName = {0}; + toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, "", &tsmaTargetCTbName); + int32_t len = snprintf(tsmaTargetCTbName.tname, TSDB_TABLE_NAME_LEN, "%s.%s", pTsma->dbFName, pTsma->name); + len = taosCreateMD5Hash(tsmaTargetCTbName.tname, len); + sprintf(tsmaTargetCTbName.tname + len, "_%s", pRealTable->table.tableName); + SVgroupInfo vgInfo = {0}; + bool exists = false; + code = catalogGetCachedTableHashVgroup(pCxt->pParseCxt->pCatalog, &tsmaTargetCTbName, &vgInfo, &exists); + if (TSDB_CODE_SUCCESS == code) { + ASSERT(exists); + if (!pRealTable->tsmaTargetCTbVgInfo) { + pRealTable->tsmaTargetCTbVgInfo = taosArrayInit(pRealTable->pTsmas->size, POINTER_BYTES); + if (!pRealTable->tsmaTargetCTbVgInfo) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + } + SVgroupsInfo* pVgpsInfo = taosMemoryCalloc(1, sizeof(int32_t) + sizeof(SVgroupInfo)); + if (!pVgpsInfo) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + pVgpsInfo->numOfVgroups = 1; + pVgpsInfo->vgroups[0] = vgInfo; + taosArrayPush(pRealTable->tsmaTargetCTbVgInfo, &pVgpsInfo); + } else { + break; + } + + STableMeta* pTableMeta = NULL; + if (code == TSDB_CODE_SUCCESS) { + SRequestConnInfo conn = {.pTrans = pCxt->pParseCxt->pTransporter, + .requestId = pCxt->pParseCxt->requestId, + .requestObjRefId = pCxt->pParseCxt->requestRid, + .mgmtEps = pCxt->pParseCxt->mgmtEpSet}; + code = catalogGetTableMeta(pCxt->pParseCxt->pCatalog, &conn, &tsmaTargetCTbName, &pTableMeta); + } + if (code == TSDB_CODE_SUCCESS) { + STsmaTargetCTbInfo ctbInfo = {0}; + sprintf(ctbInfo.ctableName, "%s", tsmaTargetCTbName.tname); + ctbInfo.uid = pTableMeta->uid; + taosMemoryFree(pTableMeta); + if (!pRealTable->tsmaTargetCTbInfo) { + pRealTable->tsmaTargetCTbInfo = taosArrayInit(pRealTable->pTsmas->size, sizeof(STsmaTargetCTbInfo)); + if (!pRealTable->tsmaTargetCTbInfo) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + } + + taosArrayPush(pRealTable->tsmaTargetCTbInfo, &ctbInfo); + } + } + } } - return TSDB_CODE_SUCCESS; + return code; } static int32_t setTableCacheLastMode(STranslateContext* pCxt, SSelectStmt* pSelect) { @@ -5562,20 +5625,19 @@ static int32_t findEqualCondTbname(STranslateContext* pCxt, SNode* pWhere, SArra return TSDB_CODE_SUCCESS; } -static int32_t findVgroupsFromEqualTbname(STranslateContext* pCxt, SEqCondTbNameTableInfo* pInfo, - SVgroupsInfo* vgsInfo) { +static int32_t findVgroupsFromEqualTbname(STranslateContext* pCxt, SArray* aTbnames, const char* dbName, + int32_t numOfVgroups, SVgroupsInfo* vgsInfo) { int32_t nVgroups = 0; - int32_t nTbls = taosArrayGetSize(pInfo->aTbnames); + int32_t nTbls = taosArrayGetSize(aTbnames); - if (nTbls >= pInfo->pRealTable->pVgroupList->numOfVgroups) { + if (nTbls >= numOfVgroups) { vgsInfo->numOfVgroups = 0; return TSDB_CODE_SUCCESS; } for (int j = 0; j < nTbls; ++j) { - char* dbName = pInfo->pRealTable->table.dbName; SName snameTb; - char* tbName = taosArrayGetP(pInfo->aTbnames, j); + char* tbName = taosArrayGetP(aTbnames, j); toName(pCxt->pParseCxt->acctId, dbName, tbName, &snameTb); SVgroupInfo vgInfo = {0}; bool bExists; @@ -5605,16 +5667,55 @@ static int32_t setEqualTbnameTableVgroups(STranslateContext* pCxt, SSelectStmt* int32_t code = TSDB_CODE_SUCCESS; for (int i = 0; i < taosArrayGetSize(aTables); ++i) { SEqCondTbNameTableInfo* pInfo = taosArrayGet(aTables, i); - int32_t nTbls = taosArrayGetSize(pInfo->aTbnames); + int32_t nTbls = taosArrayGetSize(pInfo->aTbnames); + int32_t numOfVgs = pInfo->pRealTable->pVgroupList->numOfVgroups; SVgroupsInfo* vgsInfo = taosMemoryMalloc(sizeof(SVgroupsInfo) + nTbls * sizeof(SVgroupInfo)); - int32_t nVgroups = 0; - findVgroupsFromEqualTbname(pCxt, pInfo, vgsInfo); + findVgroupsFromEqualTbname(pCxt, pInfo->aTbnames, pInfo->pRealTable->table.dbName, numOfVgs, vgsInfo); if (vgsInfo->numOfVgroups != 0) { taosMemoryFree(pInfo->pRealTable->pVgroupList); pInfo->pRealTable->pVgroupList = vgsInfo; } else { taosMemoryFree(vgsInfo); + } + vgsInfo = NULL; + + if (pInfo->pRealTable->pTsmas) { + pInfo->pRealTable->tsmaTargetCTbVgInfo = taosArrayInit(pInfo->pRealTable->pTsmas->size, POINTER_BYTES); + if (!pInfo->pRealTable->tsmaTargetCTbVgInfo) return TSDB_CODE_OUT_OF_MEMORY; + + for (int32_t i = 0; i < pInfo->pRealTable->pTsmas->size; ++i) { + STableTSMAInfo* pTsma = taosArrayGetP(pInfo->pRealTable->pTsmas, i); + SArray *pTbNames = taosArrayInit(pInfo->aTbnames->size, POINTER_BYTES); + if (!pTbNames) return TSDB_CODE_OUT_OF_MEMORY; + + for (int32_t k = 0; k < pInfo->aTbnames->size; ++k) { + const char* pTbName = taosArrayGetP(pInfo->aTbnames, k); + char* pNewTbName = taosMemoryCalloc(1, 34 + strlen(pTbName) + 1); + if (!pNewTbName) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + taosArrayPush(pTbNames, &pNewTbName); + sprintf(pNewTbName, "%s.%s", pTsma->dbFName, pTsma->name); + int32_t len = taosCreateMD5Hash(pNewTbName, strlen(pNewTbName)); + sprintf(pNewTbName + len, "_%s", pTbName); + } + if (TSDB_CODE_SUCCESS == code) { + vgsInfo = taosMemoryMalloc(sizeof(SVgroupsInfo) + nTbls * sizeof(SVgroupInfo)); + if (!vgsInfo) code = TSDB_CODE_OUT_OF_MEMORY; + } + if (TSDB_CODE_SUCCESS == code) { + findVgroupsFromEqualTbname(pCxt, pTbNames, pInfo->pRealTable->table.dbName, numOfVgs, vgsInfo); + if (vgsInfo->numOfVgroups != 0) { + taosArrayPush(pInfo->pRealTable->tsmaTargetCTbVgInfo, &vgsInfo); + } else { + taosMemoryFree(vgsInfo); + } + } + taosArrayDestroyP(pTbNames, taosMemoryFree); + if (code) break; + } } } return TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 81316089b7..9e9d0ec805 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -365,6 +365,8 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT TSWAP(pScan->pVgroupList, pRealTable->pVgroupList); TSWAP(pScan->pSmaIndexes, pRealTable->pSmaIndexes); TSWAP(pScan->pTsmas, pRealTable->pTsmas); + TSWAP(pScan->pTsmaTargetCTbVgInfo, pRealTable->tsmaTargetCTbVgInfo); + TSWAP(pScan->pTsmaTargetCTbInfo, pRealTable->tsmaTargetCTbInfo); pScan->tableId = pRealTable->pMeta->uid; pScan->stableId = pRealTable->pMeta->suid; pScan->tableType = pRealTable->pMeta->tableType; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index d45b8a347f..63b0a737c2 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -5809,6 +5809,8 @@ typedef struct STSMAOptUsefulTsma { const STableTSMAInfo* pTsma; // NULL if no tsma available, which will use original data for calculation STimeWindow scanRange; // scan time range for this tsma SArray* pTsmaScanCols; // SArray index of tsmaFuncs array + char targetTbName[TSDB_TABLE_NAME_LEN]; // the scanning table name, used only when pTsma is not NULL + uint64_t targetTbUid; // the scanning table uid, used only when pTsma is not NULL } STSMAOptUsefulTsma; typedef struct STSMAOptCtx { @@ -6098,9 +6100,9 @@ SNodeList* tsmaOptCreateTsmaScanCols(const STSMAOptUsefulTsma* pTsma, const SNod // TODO why 2? pCol->colId = *idx + 2; pCol->tableType = TSDB_SUPER_TABLE; - pCol->tableId = pTsma->pTsma->destTbUid; + pCol->tableId = pTsma->targetTbUid; pCol->colType = COLUMN_TYPE_COLUMN; - strcpy(pCol->tableName, pTsma->pTsma->targetTb); + strcpy(pCol->tableName, pTsma->targetTbName); strcpy(pCol->dbName, pTsma->pTsma->targetDbFName); strcpy(pCol->colName, pFunc->node.aliasName); strcpy(pCol->node.aliasName, pFunc->node.aliasName); @@ -6127,9 +6129,9 @@ static int32_t tsmaOptRewriteTag(const STSMAOptCtx* pTsmaOptCtx, const STSMAOptU for (int32_t i = 0; i < pTsma->pTsma->pTags->size; ++i) { const SSchema* pSchema = taosArrayGet(pTsma->pTsma->pTags, i); if (strcmp(pTagCol->colName, pSchema->name) == 0) { - strcpy(pTagCol->tableName, pTsma->pTsma->targetTb); - strcpy(pTagCol->tableAlias, pTsma->pTsma->targetTb); - pTagCol->tableId = pTsma->pTsma->destTbUid; + strcpy(pTagCol->tableName, pTsma->targetTbName); + strcpy(pTagCol->tableAlias, pTsma->targetTbName); + pTagCol->tableId = pTsma->targetTbUid; pTagCol->tableType = TSDB_SUPER_TABLE; pTagCol->colId = pSchema->colId; found = true; @@ -6214,7 +6216,7 @@ struct TsmaOptRewriteCtx { int32_t code; }; -EDealRes tsmaOptRewriter(SNode** ppNode, void* ctx) { +EDealRes tsmaOptNodeRewriter(SNode** ppNode, void* ctx) { SNode* pNode = *ppNode; int32_t code = 0; struct TsmaOptRewriteCtx* pCtx = ctx; @@ -6233,6 +6235,27 @@ EDealRes tsmaOptRewriter(SNode** ppNode, void* ctx) { return DEAL_RES_CONTINUE; } +static int32_t tsmaOptRewriteNode(SNode* pNode, STSMAOptCtx* pCtx, const STSMAOptUsefulTsma* pTsma, bool rewriteTbName, bool rewriteTag) { + struct TsmaOptRewriteCtx ctx = { + .pTsmaOptCtx = pCtx, .pTsma = pTsma, .rewriteTag = rewriteTag, .rewriteTbname = rewriteTbName, .code = 0}; + nodesRewriteExpr(&pNode, tsmaOptNodeRewriter, &ctx); + return ctx.code; +} + +static int32_t tsmaOptRewriteNodeList(SNodeList* pNodes, STSMAOptCtx* pCtx, const STSMAOptUsefulTsma* pTsma, + bool rewriteTbName, bool rewriteTag) { + int32_t code = 0; + SNode* pNode; + FOREACH(pNode, pNodes) { + code = tsmaOptRewriteNode(pNode, pCtx, pTsma, rewriteTbName, rewriteTag); + if (TSDB_CODE_SUCCESS == code) { + // TODO do we need to replace node?? + REPLACE_NODE(pNode); + } + } + return code; +} + static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNewScan, const STSMAOptUsefulTsma* pTsma) { SNode* pNode; int32_t code = 0; @@ -6261,46 +6284,49 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew pNewScan->pScanCols = tsmaOptCreateTsmaScanCols(pTsma, pTsmaOptCtx->pAggFuncs); if (!pNewScan->pScanCols) code = TSDB_CODE_OUT_OF_MEMORY; } + if (pNewScan->tableType == TSDB_CHILD_TABLE) { + } if (code == TSDB_CODE_SUCCESS && pPkTsCol) { - tstrncpy(pPkTsCol->tableName, pTsma->pTsma->targetTb, TSDB_TABLE_NAME_LEN); - tstrncpy(pPkTsCol->tableAlias, pTsma->pTsma->targetTb, TSDB_TABLE_NAME_LEN); - pPkTsCol->tableId = pTsma->pTsma->destTbUid; - pPkTsCol->tableType = TSDB_SUPER_TABLE; + tstrncpy(pPkTsCol->tableName, pTsma->targetTbName, TSDB_TABLE_NAME_LEN); + tstrncpy(pPkTsCol->tableAlias, pTsma->targetTbName, TSDB_TABLE_NAME_LEN); + pPkTsCol->tableId = pTsma->targetTbUid; nodesListMakeStrictAppend(&pNewScan->pScanCols, nodesCloneNode((SNode*)pPkTsCol)); } if (code == TSDB_CODE_SUCCESS) { + // TODO handle child tables pNewScan->stableId = pTsma->pTsma->destTbUid; - pNewScan->tableId = pTsma->pTsma->destTbUid; - pNewScan->tableType = TSDB_SUPER_TABLE; - strcpy(pNewScan->tableName.tname, pTsma->pTsma->targetTb); // TODO set dbName + pNewScan->tableId = pTsma->targetTbUid; + strcpy(pNewScan->tableName.tname, pTsma->targetTbName); } if (code == TSDB_CODE_SUCCESS) { - // pseudo columns - FOREACH(pNode, pNewScan->pScanPseudoCols) { - if (nodeType(pNode) == QUERY_NODE_COLUMN) { - code = tsmaOptRewriteTag(pTsmaOptCtx, pTsma, (SColumnNode*)pNode); - } - } + code = tsmaOptRewriteNodeList(pNewScan->pScanPseudoCols, pTsmaOptCtx, pTsma, false, true); } if (code == TSDB_CODE_SUCCESS) { - FOREACH(pNode, pNewScan->pGroupTags) { - // TODO rewrite tag and tbname recursively - struct TsmaOptRewriteCtx ctx = { - .pTsmaOptCtx = pTsmaOptCtx, .pTsma = pTsma, .rewriteTag = true, .rewriteTbname = true, .code = 0}; - nodesRewriteExpr(&pNode, tsmaOptRewriter, &ctx); - if (ctx.code) { - code = ctx.code; - } else { - REPLACE_NODE(pNode); + code = tsmaOptRewriteNode(pNewScan->pTagCond, pTsmaOptCtx, pTsma, true, true); + } + if (code == TSDB_CODE_SUCCESS) { + code = tsmaOptRewriteNodeList(pNewScan->pGroupTags, pTsmaOptCtx, pTsma, true, true); + } + if (pNewScan->pTsmaTargetCTbVgInfo) { + for (int32_t i = 0; i < taosArrayGetSize(pNewScan->pTsmas); ++i) { + STableTSMAInfo* pTsmaInfo = taosArrayGetP(pNewScan->pTsmas, i); + if (pTsmaInfo == pTsma->pTsma) { + SVgroupsInfo* pVgpsInfo = taosArrayGetP(pNewScan->pTsmaTargetCTbVgInfo, i); + taosMemoryFreeClear(pNewScan->pVgroupList); + int32_t len = sizeof(int32_t) + sizeof(SVgroupInfo) * pVgpsInfo->numOfVgroups; + pNewScan->pVgroupList = taosMemoryCalloc(1, len); + memcpy(pNewScan->pVgroupList, pVgpsInfo, len); + break; } } } } else { + // TODO rewrite tagcond? FOREACH(pNode, pNewScan->pGroupTags) { // rewrite tbname recursively struct TsmaOptRewriteCtx ctx = { .pTsmaOptCtx = pTsmaOptCtx, .pTsma = NULL, .rewriteTag = false, .rewriteTbname = true, .code = 0}; - nodesRewriteExpr(&pNode, tsmaOptRewriter, &ctx); + nodesRewriteExpr(&pNode, tsmaOptNodeRewriter, &ctx); if (ctx.code) { code = ctx.code; } else { @@ -6337,7 +6363,7 @@ static int32_t tsmaOptRevisePlan2(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, const STSMAOptUsefulTsma* pTsma) { int32_t code = 0; SColumnNode* pColNode; - SWindowLogicNode* pWindow; + SWindowLogicNode* pWindow = NULL; SAggLogicNode* pAgg; SNodeList* pAggFuncs; SListCell* pScanListCell; @@ -6360,7 +6386,7 @@ static int32_t tsmaOptRevisePlan2(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, if (fmIsGroupKeyFunc(pAggFunc->funcId)) { struct TsmaOptRewriteCtx ctx = { .pTsmaOptCtx = pTsmaOptCtx, .pTsma = pTsma, .rewriteTag = true, .rewriteTbname = true, .code = 0}; - nodesRewriteExpr(&pAggFuncNode, tsmaOptRewriter, &ctx); + nodesRewriteExpr(&pAggFuncNode, tsmaOptNodeRewriter, &ctx); if (ctx.code) { code = ctx.code; } else { @@ -6437,7 +6463,7 @@ static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, if (fmIsGroupKeyFunc(pAggFunc->funcId)) { struct TsmaOptRewriteCtx ctx = { .pTsmaOptCtx = pTsmaOptCtx, .pTsma = pTsma, .rewriteTag = true, .rewriteTbname = true, .code = 0}; - nodesRewriteExpr(&pAggFuncNode, tsmaOptRewriter, &ctx); + nodesRewriteExpr(&pAggFuncNode, tsmaOptNodeRewriter, &ctx); if (ctx.code) { code = ctx.code; } else { @@ -6498,6 +6524,24 @@ static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) { const STSMAOptUsefulTsma* pTsma = NULL; SNodeList* pAggFuncs = NULL; + for (int32_t i = 0; i < pTsmaOptCtx->pUsedTsmas->size; ++i) { + STSMAOptUsefulTsma* pTsma = taosArrayGet(pTsmaOptCtx->pUsedTsmas, i); + if (pTsma->pTsma) { + if (pTsmaOptCtx->pScan->tableType == TSDB_CHILD_TABLE) { + for (int32_t j = 0; j < pTsmaOptCtx->pScan->pTsmas->size; ++j) { + if (taosArrayGetP(pTsmaOptCtx->pScan->pTsmas, j) == pTsma->pTsma) { + const STsmaTargetCTbInfo* pCtbInfo = taosArrayGet(pTsmaOptCtx->pScan->pTsmaTargetCTbInfo, j); + strcpy(pTsma->targetTbName, pCtbInfo->ctableName); + pTsma->targetTbUid = pCtbInfo->uid; + } + } + } else { + strcpy(pTsma->targetTbName, pTsma->pTsma->targetTb); + pTsma->targetTbUid = pTsma->pTsma->destTbUid; + } + } + } + for (int32_t i = 1; i < pTsmaOptCtx->pUsedTsmas->size && code == TSDB_CODE_SUCCESS; ++i) { pTsma = taosArrayGet(pTsmaOptCtx->pUsedTsmas, i); SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN); diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index 5c4de79847..64d80166c8 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -50,6 +50,10 @@ class UsedTsma: def setIsTsma(self): self.is_tsma_ = self.name.endswith(self.TSMA_RES_STB_POSTFIX) + if not self.is_tsma_: + pos = self.name.find('_') + if pos == 32: + self.is_tsma_ = True class TSMAQueryContext: def __init__(self) -> None: @@ -106,9 +110,12 @@ class TSMAQCBuilder: self.qc_.used_tsmas.append(used_tsma) return self - def should_query_with_tsma(self, tsma_name: str, ts_begin: str, ts_end: str) -> 'TSMAQCBuilder': + def should_query_with_tsma(self, tsma_name: str, ts_begin: str, ts_end: str, child_tb: bool = False) -> 'TSMAQCBuilder': used_tsma: UsedTsma = UsedTsma() - used_tsma.name = tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX + if child_tb: + used_tsma.name = tsma_name + else: + used_tsma.name = tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX used_tsma.time_range_start = self.to_timestamp(ts_begin) used_tsma.time_range_end = self.to_timestamp(ts_end) used_tsma.is_tsma_ = True @@ -466,7 +473,7 @@ class TDTestCase: def test_query_sub_table(self): sql = 'select avg(c1) from t1' - ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma2', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc() + ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma('e8945e7385834f8c22705546d4016539_t1', UsedTsma.TS_MIN, UsedTsma.TS_MAX, child_tb=True).get_qc() self.tsma_tester.check_sql(sql, ctx) @@ -606,7 +613,7 @@ class TDTestCase: self.test_create_tsma_on_norm_table() self.test_create_tsma_on_child_table() self.test_create_recursive_tsma() - ## self.test_drop_stable() + ## self.test_drop_stable() ## drop stable and recreate a stable ## self.test_drop_ctable() self.test_drop_db()