diff --git a/include/common/tcommon.h b/include/common/tcommon.h index bdf5f99083..30209fe607 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -414,6 +414,8 @@ typedef struct STUidTagInfo { int32_t taosGenCrashJsonMsg(int signum, char **pMsg, int64_t clusterId, int64_t startTime); +#define TSMA_RES_STB_POSTFIX "_tsma_res_stb_" + #ifdef __cplusplus } #endif diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b8bc9c6659..338a297619 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -4192,7 +4192,8 @@ int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pR int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp); void tFreeSViewMetaRsp(SViewMetaRsp* pRsp); typedef struct { - char name[TSDB_TABLE_FNAME_LEN]; + char name[TSDB_TABLE_FNAME_LEN]; // table name or tsma name + bool fetchingTsma; // if we are fetching with tsma name }STableTSMAInfoReq; int32_t tSerializeTableTSMAInfoReq(void* buf, int32_t bufLen, const STableTSMAInfoReq* pReq); @@ -4218,6 +4219,8 @@ typedef struct { int8_t unit; SArray* pFuncs; // SArray SArray* pTags; // SArray + SArray* pUsedCols; // SArray + char* ast; } STableTSMAInfo; typedef struct { diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 16b701e875..b95693e0a5 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -227,6 +227,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_DROP_TSMA, "drop-tsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STB_DROP, "drop-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_TSMA, "get-table-tsma", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_GET_TSMA, "get-tsma", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8 diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 46db0d19bd..1d12ac8e57 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -95,6 +95,7 @@ typedef struct SCatalogReq { SArray* pTableTag; // element is SNAME SArray* pView; // element is STablesReq SArray* pTableTSMAs; // element is STablesReq + SArray* pTSMAs; // element is STablesReq bool qNodeRequired; // valid qnode bool dNodeRequired; // valid dnode bool svrVerRequired; @@ -124,6 +125,7 @@ typedef struct SMetaData { SArray* pDnodeList; // pRes = SArray* SArray* pView; // pRes = SViewMeta* SArray* pTableTsmas; // pRes = SArray + SArray* pTsmas; // pRes = SArray SMetaRes* pSvrVer; // pRes = char* } SMetaData; @@ -412,6 +414,8 @@ int32_t catalogRemoveTSMA(SCatalog* pCtg, const STableTSMAInfo* pTsma); int32_t catalogGetTableTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes); +int32_t catalogGetTsma(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTsmaName, STableTSMAInfo** pTsma); + /** * Destroy catalog and relase all resources */ diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index fb9f49889f..d11e4e565c 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -596,6 +596,7 @@ typedef struct STSMAOptions { SNodeList* pCols; SNode* pInterval; uint8_t tsPrecision; + bool recursiveTsma; // true if create recursive tsma } STSMAOptions; typedef struct SCreateTSMAStmt { @@ -603,7 +604,7 @@ typedef struct SCreateTSMAStmt { bool ignoreExists; char tsmaName[TSDB_INDEX_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN]; - char tableName[TSDB_TABLE_NAME_LEN]; + char tableName[TSDB_TABLE_NAME_LEN]; // base tb name or base tsma name STSMAOptions* pOptions; //SMCreateSmaReq* pReq; } SCreateTSMAStmt; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 348dbf4ae9..19798fa38b 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -937,6 +937,7 @@ int32_t cloneCatalogReq(SCatalogReq **ppTarget, SCatalogReq *pSrc) { pTarget->pTableTag = taosArrayDup(pSrc->pTableTag, NULL); pTarget->pView = taosArrayDup(pSrc->pView, NULL); pTarget->pTableTSMAs = taosArrayDup(pSrc->pTableTSMAs, NULL); + pTarget->pTSMAs = taosArrayDup(pSrc->pTSMAs, NULL); pTarget->qNodeRequired = pSrc->qNodeRequired; pTarget->dNodeRequired = pSrc->dNodeRequired; pTarget->svrVerRequired = pSrc->svrVerRequired; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c213187b54..9fa52060a9 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9943,6 +9943,7 @@ int32_t tSerializeTableTSMAInfoReq(void* buf, int32_t bufLen, const STableTSMAIn if (tStartEncode(&encoder) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeI8(&encoder, pReq->fetchingTsma) < 0) return -1; tEndEncode(&encoder); @@ -9957,6 +9958,7 @@ int32_t tDeserializeTableTSMAInfoReq(void* buf, int32_t bufLen, STableTSMAInfoRe if (tStartDecode(&decoder) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeI8(&decoder, (uint8_t*)&pReq->fetchingTsma) < 0) return -1; tEndDecode(&decoder); @@ -9992,6 +9994,14 @@ static int32_t tEncodeTableTSMAInfo(SEncoder* pEncoder, const STableTSMAInfo* pT const SSchema* pSchema = taosArrayGet(pTsmaInfo->pTags, i); if (tEncodeSSchema(pEncoder, pSchema) < 0) return -1; } + size = pTsmaInfo->pUsedCols ? pTsmaInfo->pUsedCols->size : 0; + if (tEncodeI32(pEncoder, size) < 0) return -1; + for (int32_t i = 0; i < size; ++i) { + const SSchema* pSchema = taosArrayGet(pTsmaInfo->pUsedCols, i); + if (tEncodeSSchema(pEncoder, pSchema) < 0) return -1; + } + + if (tEncodeCStr(pEncoder, pTsmaInfo->ast) < 0) return -1; return 0; } @@ -10033,6 +10043,18 @@ static int32_t tDecodeTableTSMAInfo(SDecoder* pDecoder, STableTSMAInfo* pTsmaInf } } + if (tDecodeI32(pDecoder, &size) < 0) return -1; + if (size > 0) { + pTsmaInfo->pUsedCols = taosArrayInit(size, sizeof(SSchema)); + if (!pTsmaInfo->pUsedCols) return -1; + for (int32_t i = 0; i < size; ++i) { + SSchema schema = {0}; + if (tDecodeSSchema(pDecoder, &schema) < 0) return -1; + taosArrayPush(pTsmaInfo->pUsedCols, &schema); + } + } + if (tDecodeCStrAlloc(pDecoder, &pTsmaInfo->ast) < 0) return -1; + return 0; } @@ -10094,6 +10116,8 @@ void tFreeTableTSMAInfo(void* p) { if (pTsmaInfo) { taosArrayDestroy(pTsmaInfo->pFuncs); taosArrayDestroy(pTsmaInfo->pTags); + taosArrayDestroy(pTsmaInfo->pUsedCols); + taosMemoryFree(pTsmaInfo->ast); } } @@ -10116,10 +10140,21 @@ int32_t tCloneTbTSMAInfo(STableTSMAInfo* pInfo, STableTSMAInfo** pRes) { *pRet = *pInfo; if (pInfo->pFuncs) { pRet->pFuncs = taosArrayDup(pInfo->pFuncs, NULL); + if (!pRet->pFuncs) code = TSDB_CODE_OUT_OF_MEMORY; } - if (pInfo->pTags) { + if (pInfo->pTags && code == TSDB_CODE_SUCCESS) { pRet->pTags = taosArrayDup(pInfo->pTags, NULL); + if (!pRet->pTags) code = TSDB_CODE_OUT_OF_MEMORY; } + if (pInfo->pUsedCols && code == TSDB_CODE_SUCCESS) { + pRet->pUsedCols = taosArrayDup(pInfo->pUsedCols, NULL); + if (!pRet->pUsedCols) code = TSDB_CODE_OUT_OF_MEMORY; + } + if (pInfo->ast && code == TSDB_CODE_SUCCESS) { + pRet->ast = taosStrdup(pInfo->ast); + if (!pRet->ast) code = TSDB_CODE_OUT_OF_MEMORY; + } + if (code) tFreeTableTSMAInfo(pRet); *pRes = pRet; return code; } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 386a586585..73ad03d40f 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -101,6 +101,7 @@ int32_t mndInitSma(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TSMA, mndProcessCreateTSMAReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TSMA, mndProcessDropTSMAReq); mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_TSMA, mndProcessGetTbTSMAReq); + mndSetMsgHandle(pMnode, TDMT_MND_GET_TSMA, mndProcessGetTbTSMAReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndRetrieveTSMA); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndCancelRetrieveTSMA); @@ -1577,7 +1578,7 @@ static void mndTSMAGenerateOutputName(const char* tsmaName, char* streamName, ch SName smaName; tNameFromString(&smaName, tsmaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname); - snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s_tsma_res_stb", tsmaName); + snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s"TSMA_RES_STB_POSTFIX, tsmaName); } static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { @@ -1964,17 +1965,67 @@ int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STa } nodesDestroyNode(pNode); } - if (code == TSDB_CODE_SUCCESS) { - if (pDestStb->numOfTags > 0) { - pInfo->pTags = taosArrayInit(pDestStb->numOfTags, sizeof(SSchema)); + pInfo->ast = taosStrdup(pSma->ast); + if (!pInfo->ast) code = TSDB_CODE_OUT_OF_MEMORY; + + if (code == TSDB_CODE_SUCCESS && pDestStb->numOfTags > 0) { + pInfo->pTags = taosArrayInit(pDestStb->numOfTags, sizeof(SSchema)); + if (!pInfo->pTags) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { for (int32_t i = 0; i < pDestStb->numOfTags; ++i) { taosArrayPush(pInfo->pTags, &pDestStb->pTags[i]); } } } + if (code == TSDB_CODE_SUCCESS) { + pInfo->pUsedCols = taosArrayInit(pDestStb->numOfColumns - 3, sizeof(SSchema)); + if (!pInfo->pUsedCols) + code = TSDB_CODE_OUT_OF_MEMORY; + else { + // skip _wstart, _wend, _duration + for (int32_t i = 1; i < pDestStb->numOfColumns - 2; ++i) { + taosArrayPush(pInfo->pUsedCols, &pDestStb->pColumns[i]); + } + } + } return code; } +static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) { + int32_t code = -1; + SSmaObj *pSma = NULL; + SStbObj *pDstStb = NULL; + + pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, tsmaFName); + if (pSma) { + pDstStb = mndAcquireStb(pMnode, pSma->dstTbName); + if (!pDstStb) { + sdbRelease(pMnode->pSdb, pSma); + return TSDB_CODE_SUCCESS; + } + + STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo)); + if (!pTsma) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + sdbRelease(pMnode->pSdb, pSma); + mndReleaseStb(pMnode, pDstStb); + return code; + } + + terrno = dumpTSMAInfoFromSmaObj(pSma, pDstStb, pTsma); + mndReleaseStb(pMnode, pDstStb); + sdbRelease(pMnode->pSdb, pSma); + if (terrno) return code; + if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tFreeTableTSMAInfo(pTsma); + } + *exist = true; + } + return 0; +} + static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *rsp, bool *exist) { int32_t code = -1; SSmaObj * pSma = NULL; @@ -2011,19 +2062,14 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp } terrno = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma); mndReleaseStb(pMnode, pStb); - if (terrno) { - sdbRelease(pSdb, pSma); - return code; - } + sdbRelease(pSdb, pSma); + if (terrno) return code; if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) { terrno = TSDB_CODE_OUT_OF_MEMORY; tFreeTableTSMAInfo(pTsma); - sdbRelease(pSdb, pSma); return code; } *exist = true; - - sdbRelease(pSdb, pSma); } return TSDB_CODE_SUCCESS; } @@ -2047,7 +2093,11 @@ static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) { goto _OVER; } - code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist); + if (tsmaReq.fetchingTsma) { + code = mndGetTSMA(pMnode, tsmaReq.name, &rsp, &exist); + } else { + code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist); + } if (code) { goto _OVER; } @@ -2092,6 +2142,7 @@ static int32_t mkNonExistTSMAInfo(const STSMAVersion *pTsmaVer, STableTSMAInfo * tstrncpy(pInfo->tb, pTsmaVer->tbName, TSDB_TABLE_NAME_LEN); tstrncpy(pInfo->name, pTsmaVer->name, TSDB_TABLE_NAME_LEN); pInfo->dbId = pTsmaVer->dbId; + pInfo->ast = "dummy";// TODO could be freed *ppTsma = pInfo; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index b2b6bfd102..c27f82a890 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -129,6 +129,7 @@ typedef enum { CTG_TASK_GET_TB_TAG, CTG_TASK_GET_VIEW, CTG_TASK_GET_TB_TSMA, + CTG_TASK_GET_TSMA, } CTG_TASK_TYPE; typedef enum { @@ -391,6 +392,7 @@ typedef struct SCtgJob { int32_t svrVerNum; int32_t viewNum; int32_t tbTsmaNum; + int32_t tsmaNum; // currently, only 1 is possible } SCtgJob; typedef struct SCtgMsgCtx { @@ -1116,13 +1118,14 @@ int32_t ctgGetUserCb(SCtgTask* pTask); int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx, int32_t* fetchIdx, int32_t baseResIdx, SArray* pList); -int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* name, STableTSMAInfoRsp* out, - SCtgTaskReq* tReq); -int32_t ctgUpdateTbTSMAEnqueue(SCatalog* pCtg, STSMACache** pTsma, bool syncOp); -int32_t ctgDropTSMAForTbEnqueue(SCatalog* pCtg, SName* pName, bool syncOp); -int32_t ctgDropTbTSMAEnqueue(SCatalog* pCtg, const STSMACache* pTsma, bool syncOp); -int32_t ctgOpDropTbTSMA(SCtgCacheOperation* operation); -int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation* operation); +int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaName); +int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* name, STableTSMAInfoRsp* out, + SCtgTaskReq* tReq, int32_t reqType); +int32_t ctgUpdateTbTSMAEnqueue(SCatalog* pCtg, STSMACache** pTsma, bool syncOp); +int32_t ctgDropTSMAForTbEnqueue(SCatalog* pCtg, SName* pName, bool syncOp); +int32_t ctgDropTbTSMAEnqueue(SCatalog* pCtg, const STSMACache* pTsma, bool syncOp); +int32_t ctgOpDropTbTSMA(SCtgCacheOperation* operation); +int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation* operation); uint64_t ctgGetTbTSMACacheSize(STSMACache* pTsmaInfo); void ctgFreeTbTSMAInfo(void* p); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 8abcce95ce..cbf2e05d8c 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1832,7 +1832,8 @@ _return: int32_t ctgGetTbTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, SArray** ppRes) { STableTSMAInfoRsp tsmasRsp = {0}; - int32_t code = ctgGetTbTSMAFromMnode(pCtg, pConn, pTableName, &tsmasRsp, NULL); + //TODO get from cache first + int32_t code = ctgGetTbTSMAFromMnode(pCtg, pConn, pTableName, &tsmasRsp, NULL, TDMT_MND_GET_TABLE_TSMA); if (code == TSDB_CODE_MND_SMA_NOT_EXIST) { code = 0; goto _return; @@ -1870,6 +1871,41 @@ _return: CTG_API_LEAVE(code); } +int32_t ctgGetTsma(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTsmaName, STableTSMAInfo** pTsma) { + STableTSMAInfoRsp tsmaRsp = {0}; + int32_t code = ctgGetTbTSMAFromMnode(pCtg, pConn, pTsmaName, &tsmaRsp, NULL, TDMT_MND_GET_TSMA); + if (code == TSDB_CODE_MND_SMA_NOT_EXIST) { + code = 0; + goto _return; + } + + CTG_ERR_JRET(code); + + ASSERT(tsmaRsp.pTsmas && tsmaRsp.pTsmas->size == 1); + *pTsma = taosArrayGetP(tsmaRsp.pTsmas, 0); + taosArrayDestroy(tsmaRsp.pTsmas); + +_return: + if (tsmaRsp.pTsmas) { + tFreeTableTSMAInfoRsp(&tsmaRsp); + } + CTG_RET(code); +} + +int32_t catalogGetTsma(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTsmaName, STableTSMAInfo** pTsma) { + CTG_API_ENTER(); + + if (!pCtg || !pConn || !pTsmaName) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + int32_t code = 0; + CTG_ERR_JRET(ctgGetTsma(pCtg, pConn, pTsmaName, pTsma)); + +_return: + CTG_API_LEAVE(code); +} + int32_t catalogClearCache(void) { CTG_API_ENTER_NOLOCK(); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 3e38de466f..0f772d0eba 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -492,6 +492,22 @@ int32_t ctgInitGetTbTSMATask(SCtgJob* pJob, int32_t taskId, void* param) { return TSDB_CODE_SUCCESS; } +int32_t ctgInitGetTSMATask(SCtgJob* pJob, int32_t taskId, void* param) { + SCtgTask task = {0}; + task.type = CTG_TASK_GET_TSMA; + task.taskId = taskId; + task.pJob = pJob; + + SCtgTbTSMACtx* pTaskCtx = taosMemoryCalloc(1, sizeof(SCtgTbTSMACtx)); + if (!pTaskCtx) CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + task.taskCtx = pTaskCtx; + pTaskCtx->pNames = param; + pTaskCtx->pResList = taosArrayInit(pJob->tsmaNum, sizeof(SMetaRes)); + + taosArrayPush(pJob->pTasks, &task); + return 0; +} + int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, const SCatalogReq* pReq) { SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); SHashObj* pTb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); @@ -589,6 +605,7 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, con } for (int32_t i = 0; i < pJob->tbTsmaNum; ++i) { + // TODO test for it SName* name = taosArrayGet(pReq->pTableTSMAs, i); ctgDropTSMAForTbEnqueue(pCtg, name, true); } @@ -635,6 +652,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const int32_t tbTagNum = (int32_t)taosArrayGetSize(pReq->pTableTag); int32_t viewNum = (int32_t)ctgGetTablesReqNum(pReq->pView); int32_t tbTsmaNum = (int32_t)taosArrayGetSize(pReq->pTableTSMAs); + int32_t tsmaNum = (int32_t)taosArrayGetSize(pReq->pTSMAs); int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum + tbTagNum + viewNum + tbTsmaNum; @@ -671,6 +689,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const pJob->tbTagNum = tbTagNum; pJob->viewNum = viewNum; pJob->tbTsmaNum = tbTsmaNum; + pJob->tsmaNum = tsmaNum; #if CTG_BATCH_FETCH pJob->pBatchs = @@ -766,6 +785,9 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const if (tbTsmaNum > 0) { CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_TSMA, pReq->pTableTSMAs, NULL)); } + if (tsmaNum > 0) { + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TSMA, pReq->pTSMAs, NULL)); + } if (qnodeNum) { CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_QNODE, NULL, NULL)); @@ -2612,7 +2634,7 @@ int32_t ctgLaunchGetTbTSMATask(SCtgTask* pTask) { for (int32_t idx = 0; idx < dbNum; ++idx) { STablesReq* pReq = taosArrayGet(pCtx->pNames, idx); - ctgGetTbTSMAFromCache(pCtg, pCtx, idx, &fetchIdx, baseResIdx, pReq->pTables); + CTG_ERR_RET(ctgGetTbTSMAFromCache(pCtg, pCtx, idx, &fetchIdx, baseResIdx, pReq->pTables)); baseResIdx += taosArrayGetSize(pReq->pTables); } pCtx->fetchNum = taosArrayGetSize(pCtx->pFetches); @@ -2632,12 +2654,100 @@ int32_t ctgLaunchGetTbTSMATask(SCtgTask* pTask) { SCtgTaskReq tReq; tReq.pTask = pTask; tReq.msgIdx = pFetch->fetchIdx; - CTG_ERR_RET(ctgGetTbTSMAFromMnode(pCtg, pConn, pName, NULL, &tReq)); + CTG_ERR_RET(ctgGetTbTSMAFromMnode(pCtg, pConn, pName, NULL, &tReq, TDMT_MND_GET_TABLE_TSMA)); } return TSDB_CODE_SUCCESS; } +int32_t ctgLaunchGetTSMATask(SCtgTask* pTask) { + SCatalog* pCtg = pTask->pJob->pCtg; + SCtgTbTSMACtx* pCtx = (SCtgTbTSMACtx*)pTask->taskCtx; + SRequestConnInfo* pConn = &pTask->pJob->conn; + SArray* pRes = NULL; + SCtgJob* pJob = pTask->pJob; + // currently, only support fetching one tsma + ASSERT(pCtx->pNames->size == 1); + STablesReq* pReq = taosArrayGet(pCtx->pNames, 0); + ASSERT(pReq->pTables->size == 1); + SName* pTsmaName = taosArrayGet(pReq->pTables, 0); + CTG_ERR_RET(ctgGetTSMAFromCache(pCtg, pCtx, pTsmaName)); + + if (pCtx->pResList->size == 0) { + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, 0); + if (!pMsgCtx->pBatchs) pMsgCtx->pBatchs = pJob->pBatchs; + SCtgTaskReq tReq = {.pTask = pTask, .msgIdx = 0}; + taosArrayPush(pCtx->pResList, &(SMetaRes){0}); + CTG_ERR_RET(ctgGetTbTSMAFromMnode(pCtg, pConn, pTsmaName, NULL, &tReq, TDMT_MND_GET_TSMA)); + } + + return 0; +} + +int32_t ctgHandleGetTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; + SCtgTask* pTask = tReq->pTask; + SCatalog* pCtg = pTask->pJob->pCtg; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); + SCtgTbTSMACtx* pCtx = pTask->taskCtx; + SMetaRes* pRes = taosArrayGet(pCtx->pResList, 0); + STablesReq* pTbReq = taosArrayGet(pCtx->pNames, 0); + SName* pName = taosArrayGet(pTbReq->pTables, 0); + SRequestConnInfo* pConn = &pTask->pJob->conn; + CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); + + switch (reqType) { + case TDMT_MND_TABLE_META: { + STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; + if (!CTG_IS_META_NULL(pOut->metaType)) { + CTG_ERR_JRET(ctgUpdateTbMetaToCache(pCtg, pOut, CTG_FLAG_SYNC_OP)); + } + } break; + case TDMT_MND_GET_TSMA: { + STableTSMAInfoRsp* pOut = pMsgCtx->out; + pRes->code = 0; + if (pOut->pTsmas->size > 0) { + ASSERT(pOut->pTsmas->size == 1); + pRes->pRes = pOut; + pMsgCtx->out = NULL; + TSWAP(pTask->res, pCtx->pResList); + break; + STableTSMAInfo* pTsma = taosArrayGetP(pOut->pTsmas, 0); + + SName dstTbName = *pName; + strcpy(dstTbName.tname, pTsma->targetTb); + + SCtgTbMetaCtx stbCtx = {0}; + stbCtx.flag = CTG_FLAG_STB; + stbCtx.pName = &dstTbName; + STableMeta* pDstTbMeta = NULL; + (void)ctgReadTbMetaFromCache(pCtg, &stbCtx, &pDstTbMeta); + + if (!pDstTbMeta) { + TSWAP(pMsgCtx->lastOut, pMsgCtx->out); + CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pTsma->targetDbFName, dstTbName.tname, NULL, tReq)); + } else { + taosMemoryFreeClear(pDstTbMeta); + } + } + } break; + default: + ASSERT(0); + } + +_return: + if (code) { + if (TSDB_CODE_MND_SMA_NOT_EXIST == code) { + code = TSDB_CODE_SUCCESS; + } else { + ctgTaskError("Get tsma for %d.%s.%s failed with err: %s", pName->acctId, pName->dbname, pName->tname, + tstrerror(code)); + } + } + ctgHandleTaskEnd(pTask, code); + CTG_RET(code); +} + int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { int32_t code = 0; SCtgTask* pTask = tReq->pTask; @@ -2708,6 +2818,16 @@ int32_t ctgDumpTbTSMARes(SCtgTask* pTask) { return TSDB_CODE_SUCCESS; } +int32_t ctgDumpTSMARes(SCtgTask* pTask) { + if (pTask->subTask) { + return TSDB_CODE_SUCCESS; + } + + SCtgJob* pJob = pTask->pJob; + pJob->jobRes.pTsmas = pTask->res; + return TSDB_CODE_SUCCESS; +} + int32_t ctgRelaunchGetTbMetaTask(SCtgTask* pTask) { ctgResetTbMetaTask(pTask); @@ -2826,6 +2946,7 @@ SCtgAsyncFps gCtgAsyncFps[] = { {ctgInitGetTbTagTask, ctgLaunchGetTbTagTask, ctgHandleGetTbTagRsp, ctgDumpTbTagRes, NULL, NULL}, {ctgInitGetViewsTask, ctgLaunchGetViewsTask, ctgHandleGetViewsRsp, ctgDumpViewsRes, NULL, NULL}, {ctgInitGetTbTSMATask, ctgLaunchGetTbTSMATask, ctgHandleGetTbTSMARsp, ctgDumpTbTSMARes, NULL, NULL}, + {ctgInitGetTSMATask, ctgLaunchGetTSMATask, ctgHandleGetTSMARsp, ctgDumpTSMARes, NULL, NULL}, }; int32_t ctgMakeAsyncRes(SCtgJob* pJob) { diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 09d9566fef..eb6ecd903c 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -3243,7 +3243,7 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx } tNameGetFullDbName(pName, dbFName); - ctgAcquireDBCache(pCtg, dbFName, &dbCache); + CTG_ERR_RET(ctgAcquireDBCache(pCtg, dbFName, &dbCache)); if (!dbCache) { ctgDebug("DB %s not in cache", dbFName); for (int32_t i = 0; i < tbNum; ++i) { @@ -3312,6 +3312,49 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx CTG_RET(code); } +int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaName) { + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + SCtgDBCache *pDbCache = NULL; + int32_t code = TSDB_CODE_SUCCESS; + SMetaRes res = {0}; + bool found = false; + STSMACache * pTsmaOut = NULL; + + tNameGetFullDbName(pTsmaName, dbFName); + + CTG_ERR_RET(ctgAcquireDBCache(pCtg, dbFName, &pDbCache)); + if (!pDbCache) { + ctgDebug("DB %s not in cache", dbFName); + CTG_RET(code); + } + + void * pIter = taosHashIterate(pDbCache->tsmaCache, NULL); + while (pIter && !found) { + SCtgTSMACache* pCtgCache = pIter; + CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock); + int32_t size = pCtgCache ? (pCtgCache->pTsmas ? pCtgCache->pTsmas->size : 0) : 0; + for (int32_t i = 0; i < size; ++i) { + STSMACache* pCache = taosArrayGetP(pCtgCache->pTsmas, i); + if (memcmp(pCache->name, pTsmaName->tname, TSDB_TABLE_NAME_LEN) == 0) { + found = true; + CTG_CACHE_NHIT_INC(CTG_CI_TBL_TSMA, 1); + code = tCloneTbTSMAInfo(pCache, &pTsmaOut); + break; + } + } + CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock); + pIter = taosHashIterate(pDbCache->tsmaCache, pIter); + } + taosHashCancelIterate(pDbCache->tsmaCache, pIter); + if (found) { + res.pRes = pTsmaOut; + taosArrayPush(pCtx->pResList, &res); + } + + ctgReleaseDBCache(pCtg, pDbCache); + CTG_RET(code); +} + int32_t ctgUpdateTbTSMAEnqueue(SCatalog *pCtg, STSMACache **pTsma, bool syncOp) { int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 5166e1c58a..804f47d2d7 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -337,6 +337,7 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qDebug("Got view-meta from mnode, viewFName:%s", target); break; } + case TDMT_MND_GET_TSMA: case TDMT_MND_GET_TABLE_TSMA: { if (TSDB_CODE_SUCCESS != rspCode) { if (TSDB_CODE_MND_SMA_NOT_EXIST != rspCode) { @@ -1480,11 +1481,10 @@ int32_t ctgGetViewInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* return TSDB_CODE_SUCCESS; } -int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* name, STableTSMAInfoRsp* out, - SCtgTaskReq* tReq) { +int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* name, STableTSMAInfoRsp* out, + SCtgTaskReq* tReq, int32_t reqType) { char* msg = NULL; int32_t msgLen = 0; - int32_t reqType = TDMT_MND_GET_TABLE_TSMA; SCtgTask* pTask = tReq ? tReq->pTask : NULL; void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; char tbFName[TSDB_TABLE_FNAME_LEN]; diff --git a/source/libs/catalog/src/ctgRent.c b/source/libs/catalog/src/ctgRent.c index 90c41fb4b1..37378ee3cb 100755 --- a/source/libs/catalog/src/ctgRent.c +++ b/source/libs/catalog/src/ctgRent.c @@ -268,7 +268,7 @@ void ctgRemoveTSMARent(SCatalog *pCtg, SCtgDBCache *dbCache) { CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock); int32_t size = pCtgCache ? pCtgCache->pTsmas->size : 0; for (int32_t i = 0; i < size; ++i) { - STSMACache* pCache = taosArrayGet(pCtgCache->pTsmas, i); + STSMACache* pCache = taosArrayGetP(pCtgCache->pTsmas, i); if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSortCompare, ctgTSMAVersionSearchCompare)) { ctgDebug("tsma removed from rent, viewId: %" PRIx64 " name: %s.%s.%s", pCache->tsmaId, pCache->dbFName, pCache->tb, pCache->name); } diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index f7f9a5032d..211a21dd4a 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -193,6 +193,9 @@ void ctgFreeSMetaData(SMetaData* pData) { taosArrayDestroy(pData->pTableTsmas); pData->pTableTsmas = NULL; + taosArrayDestroy(pData->pTsmas); + pData->pTsmas = NULL; + taosMemoryFreeClear(pData->pSvrVer); } @@ -625,6 +628,7 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) { } break; } + case TDMT_MND_GET_TSMA: case TDMT_MND_GET_TABLE_TSMA: { if (pCtx->out) { tFreeTableTSMAInfoRsp(pCtx->out); @@ -814,6 +818,7 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void** pRes) { *pRes = NULL; // no need to free it break; } + case CTG_TASK_GET_TSMA: case CTG_TASK_GET_TB_TSMA: { SArray* pArr = (SArray*)*pRes; int32_t num = taosArrayGetSize(pArr); @@ -986,6 +991,7 @@ void ctgFreeTaskCtx(SCtgTask* pTask) { taosMemoryFreeClear(pTask->taskCtx); break; } + case CTG_TASK_GET_TSMA: case CTG_TASK_GET_TB_TSMA: { SCtgTbTSMACtx* pTsmaCtx = pTask->taskCtx; taosArrayDestroyEx(pTsmaCtx->pResList, ctgFreeTbTSMARes); @@ -2084,6 +2090,7 @@ void ctgDestroySMetaData(SMetaData* pData) { taosArrayDestroyEx(pData->pDnodeList, ctgFreeDnodeList); taosArrayDestroyEx(pData->pView, ctgFreeViewMeta); taosArrayDestroyEx(pData->pTableTsmas, ctgFreeTbTSMAInfo); + taosArrayDestroyEx(pData->pTsmas, ctgFreeTbTSMAInfo); taosMemoryFreeClear(pData->pSvrVer); } diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 7f1927beec..b1fba588ab 100755 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -282,8 +282,8 @@ SSDataBlock* doNonSortMerge(SOperatorInfo* pOperator) { idx = NON_SORT_NEXT_SRC(pNonSortMerge, pNonSortMerge->lastSourceIdx); continue; } - break; pNonSortMerge->lastSourceIdx = idx - 1; + break; } if (!pBlock) { diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 4091b2ea23..6d208f258b 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -7326,9 +7326,6 @@ static const char* jkTSMAOptionTsPrecision = "Precision"; static int32_t tsmaOptionToJson(const void* pObj, SJson* pJson) { const STSMAOptions* pNode = (const STSMAOptions*)pObj; int32_t code = nodeListToJson(pJson, jkTSMAOptionFuncs, pNode->pFuncs); - if (TSDB_CODE_SUCCESS == code) { - code = nodeListToJson(pJson, jkTSMAOptionCols, pNode->pCols); - } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkTSMAOptionInterval, nodeToJson, pNode->pInterval); } @@ -7341,9 +7338,6 @@ static int32_t tsmaOptionToJson(const void* pObj, SJson* pJson) { static int32_t jsonToTSMAOption(const SJson* pJson, void* pObj) { STSMAOptions* pNode = (STSMAOptions*)pObj; int32_t code = jsonToNodeList(pJson, jkTSMAOptionFuncs, &pNode->pFuncs); - if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeList(pJson, jkTSMAOptionCols, &pNode->pCols); - } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkTSMAOptionInterval, &pNode->pInterval); } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 42668fe4f8..9e36c6d598 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -919,7 +919,6 @@ void nodesDestroyNode(SNode* pNode) { } case QUERY_NODE_TSMA_OPTIONS: { STSMAOptions* pOptions = (STSMAOptions*)pNode; - nodesDestroyList(pOptions->pCols); nodesDestroyList(pOptions->pFuncs); nodesDestroyNode(pOptions->pInterval); break; diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index 702c1836de..782c90421a 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -277,7 +277,7 @@ SNode* createShowCompactsStmt(SAstCreateContext* pCxt, ENodeType type); SNode* createCreateTSMAStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* tsmaName, SNode* pOptions, SNode* pRealTable, SNode* pInterval); -SNode* createTSMAOptions(SAstCreateContext* pCxt, SNodeList* pFuncs, SNodeList* pCols); +SNode* createTSMAOptions(SAstCreateContext* pCxt, SNodeList* pFuncs); SNode* createDefaultTSMAOptions(SAstCreateContext* pCxt); SNode* createDropTSMAStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pRealTable); SNode* createShowCreateTSMAStmt(SAstCreateContext* pCxt, SNode* pRealTable); diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h index 7009960819..be670b2708 100644 --- a/source/libs/parser/inc/parUtil.h +++ b/source/libs/parser/inc/parUtil.h @@ -111,6 +111,7 @@ typedef struct SParseMetaCache { SHashObj* pTableCfg; // key is tbFName, element is STableCfg* SHashObj* pViews; // key is viewFName, element is SViewMeta* SHashObj* pTableTSMAs; // key is tbFName, elements are SArray + SHashObj* pTSMAs; // key is tsmaFName, elemetns are STableTSMAInfo* SArray* pDnodes; // element is SEpSet bool dnodeRequired; } SParseMetaCache; @@ -155,6 +156,7 @@ int32_t reserveTableIndexInCache(int32_t acctId, const char* pDb, const char* pT int32_t reserveTableCfgInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache); int32_t reserveDnodeRequiredInCache(SParseMetaCache* pMetaCache); int32_t reserveTableTSMAInfoInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache); +int32_t reserveTSMAInfoInCache(int32_t acctId, const char* pDb, const char* pTsmaName, SParseMetaCache* pMetaCache); int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta); int32_t getViewMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta); int32_t buildTableMetaFromViewMeta(STableMeta** pMeta, SViewMeta* pViewMeta); @@ -171,6 +173,7 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes); void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request); SNode* createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode* pTable, SNodeList* pHint); int32_t getTableTsmasFromCache(SParseMetaCache* pMetaCache, const SName* pTbName, SArray** pTsmas); +int32_t getTsmaFromCache(SParseMetaCache* pMetaCache, const SName* pTsmaName, STableTSMAInfo** pTsma); /** * @brief return a - b with overflow check diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 1e24efd3a0..b1b51e26da 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -582,7 +582,10 @@ db_kind_opt(A) ::= SYSTEM. /************************************************ tsma ********************************************************/ cmd ::= CREATE TSMA not_exists_opt(B) tsma_name(C) - ON full_table_name(E) tsma_opt(D) INTERVAL NK_LP duration_literal(F) NK_RP. { pCxt->pRootNode = createCreateTSMAStmt(pCxt, B, &C, D, E, releaseRawExprNode(pCxt, F)); } + ON full_table_name(E) tsma_func_list(D) + INTERVAL NK_LP duration_literal(F) NK_RP. { pCxt->pRootNode = createCreateTSMAStmt(pCxt, B, &C, D, E, releaseRawExprNode(pCxt, F)); } +cmd ::= CREATE RECURSIVE TSMA not_exists_opt(B) tsma_name(C) + ON full_table_name(D) INTERVAL NK_LP duration_literal(E) NK_RP. { pCxt->pRootNode = createCreateTSMAStmt(pCxt, B, &C, NULL, D, releaseRawExprNode(pCxt, E)); } cmd ::= DROP TSMA exists_opt(B) full_tsma_name(C). { pCxt->pRootNode = createDropTSMAStmt(pCxt, B, C); } cmd ::= SHOW CREATE TSMA full_tsma_name(B). { pCxt->pRootNode = createShowCreateTSMAStmt(pCxt, B); } cmd ::= SHOW db_name_cond_opt(B) TSMAS. { pCxt->pRootNode = createShowTSMASStmt(pCxt, B); } @@ -590,20 +593,9 @@ cmd ::= SHOW db_name_cond_opt(B) TSMAS. full_tsma_name(A) ::= tsma_name(B). { A = createRealTableNodeForIndexName(pCxt, NULL, &B); } full_tsma_name(A) ::= db_name(B) NK_DOT tsma_name(C). { A = createRealTableNodeForIndexName(pCxt, &B, &C); } -%type tsma_opt { SNode* } -%destructor tsma_opt { nodesDestroyNode($$); } -tsma_opt(A) ::= . { A = createDefaultTSMAOptions(pCxt); } -tsma_opt(A) ::= FUNCTION NK_LP tsma_func_list(B) NK_RP - COLUMN NK_LP col_name_list(C) NK_RP. { A = createTSMAOptions(pCxt, B, C); } - -%type tsma_func_list { SNodeList* } -%destructor tsma_func_list { nodesDestroyList($$); } -tsma_func_list(A) ::= tsma_func_name(B). { A = createNodeList(pCxt, B); } -tsma_func_list(A) ::= tsma_func_list(B) NK_COMMA tsma_func_name(C). { A = addNodeToList(pCxt, B, C); } - -%type tsma_func_name { SNode* } -%destructor tsma_func_name { nodesDestroyNode($$); } -tsma_func_name(A) ::= sma_func_name(B). { A = createFunctionNode(pCxt, &B, NULL); } +%type tsma_func_list { SNode* } +%destructor tsma_func_list { nodesDestroyNode($$); } +tsma_func_list(A) ::= FUNCTION NK_LP func_list(B) NK_RP. { A = createTSMAOptions(pCxt, B); } /************************************************ create index ********************************************************/ cmd ::= CREATE SMA INDEX not_exists_opt(D) diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 2685e246f1..43d2651d75 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -2846,7 +2846,19 @@ SNode* createCreateTSMAStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* CHECK_OUT_OF_MEM(pStmt); pStmt->ignoreExists = ignoreExists; - pStmt->pOptions = (STSMAOptions*)pOptions; + if (!pOptions) { + // recursive tsma + pStmt->pOptions = (STSMAOptions*)nodesMakeNode(QUERY_NODE_TSMA_OPTIONS); + if (!pStmt->pOptions) { + nodesDestroyNode((SNode*)pStmt); + pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; + snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "Out of memory"); + return NULL; + } + pStmt->pOptions->recursiveTsma = true; + } else { + pStmt->pOptions = (STSMAOptions*)pOptions; + } pStmt->pOptions->pInterval = pInterval; COPY_STRING_FORM_ID_TOKEN(pStmt->tsmaName, tsmaName); @@ -2858,7 +2870,7 @@ SNode* createCreateTSMAStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* return (SNode*)pStmt; } -SNode* createTSMAOptions(SAstCreateContext* pCxt, SNodeList* pFuncs, SNodeList* pCols) { +SNode* createTSMAOptions(SAstCreateContext* pCxt, SNodeList* pFuncs) { CHECK_PARSER_STATUS(pCxt); /* SNode *pNode1, *pNode2; @@ -2888,8 +2900,10 @@ SNode* createTSMAOptions(SAstCreateContext* pCxt, SNodeList* pFuncs, SNodeList* snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "Out of memory"); return NULL; } + // TODO check duplicate funcs somewhere + // TODO check non supported funcs somewhere pOptions->pFuncs = pFuncs; - pOptions->pCols = pCols; + //pOptions->pCols = pCols; //nodesDestroyList(pFuncs); //nodesDestroyList(pCols); diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 52bdfd2ab0..b1b8d6612e 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -766,9 +766,23 @@ static int32_t collectMetaKeyFromDropViewStmt(SCollectMetaKeyCxt* pCxt, SDropVie static int32_t collectMetaKeyFromCreateTSMAStmt(SCollectMetaKeyCxt* pCxt, SCreateTSMAStmt* pStmt) { int32_t code; - code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); - if (TSDB_CODE_SUCCESS == code) { - code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); + if (pStmt->pOptions->recursiveTsma) { + // if creating recursive tsma, the tablename is tsmaName + code = reserveTSMAInfoInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); + if (TSDB_CODE_SUCCESS == code) { + char dstTbName[TSDB_TABLE_NAME_LEN] = {0}; + // TODO check len + sprintf(dstTbName, "%s"TSMA_RES_STB_POSTFIX, pStmt->tableName); + code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, dstTbName, pCxt->pMetaCache); + if (TSDB_CODE_SUCCESS == code) { + code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, dstTbName, pCxt->pMetaCache); + } + } + } else { + code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); + if (TSDB_CODE_SUCCESS == code) { + code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); + } } if (TSDB_CODE_SUCCESS == code) { code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache); diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index 09696d057b..42791fb703 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -201,6 +201,7 @@ static SKeyword keywordTable[] = { {"RATIO", TK_RATIO}, {"PAUSE", TK_PAUSE}, {"READ", TK_READ}, + {"RECURSIVE", TK_RECURSIVE}, {"REDISTRIBUTE", TK_REDISTRIBUTE}, {"RENAME", TK_RENAME}, {"REPLACE", TK_REPLACE}, diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 8af9a4a3a1..f345a17634 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -709,7 +709,7 @@ static int32_t getDnodeList(STranslateContext* pCxt, SArray** pDnodes) { static int32_t getTableTsmas(STranslateContext* pCxt, const SName* pName, SArray** ppTsmas) { SParseContext* pParCxt = pCxt->pParseCxt; - int32_t code = 0; + int32_t code = 0; if (pParCxt->async) { code = getTableTsmasFromCache(pCxt->pMetaCache, pName, ppTsmas); } else { @@ -719,7 +719,27 @@ static int32_t getTableTsmas(STranslateContext* pCxt, const SName* pName, SArray .mgmtEps = pParCxt->mgmtEpSet}; code = catalogGetTableTsmas(pParCxt->pCatalog, &conn, pName, ppTsmas); } - if (code) parserError("0x%" PRIx64 " getDnodeList error, code:%s", pCxt->pParseCxt->requestId, tstrerror(code)); + if (code) + parserError("0x%" PRIx64 " get table tsma for : %s.%s error, code:%s", pCxt->pParseCxt->requestId, pName->dbname, + pName->tname, tstrerror(code)); + return code; +} + +static int32_t getTsma(STranslateContext* pCxt, const SName* pName, STableTSMAInfo** pTsma) { + int32_t code = 0; + SParseContext* pParCxt = pCxt->pParseCxt; + if (pParCxt->async) { + code = getTsmaFromCache(pCxt->pMetaCache, pName, pTsma); + } else { + SRequestConnInfo conn = {.pTrans = pParCxt->pTransporter, + .requestId = pParCxt->requestId, + .requestObjRefId = pParCxt->requestRid, + .mgmtEps = pParCxt->mgmtEpSet}; + code = catalogGetTsma(pParCxt->pCatalog, &conn, pName, pTsma); + } + if (code) + parserError("0x%" PRIx64 " get tsma for: %s.%s error, code:%s", pCxt->pParseCxt->requestId, pName->dbname, + pName->tname, tstrerror(code)); return code; } @@ -10414,13 +10434,77 @@ static bool sortColWithColId(SNode* pNode1, SNode* pNode2) { return pCol1->colId < pCol2->colId; } +static int32_t buildTSMAAstMakeConcatFuncNode(SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, const SNode* pTbNameFunc, + SFunctionNode** pConcatFuncOut) { + int32_t code = 0; + SFunctionNode* pSubstrFunc = NULL; + SNode* pRes = NULL; + SValueNode* pTsmaNameHashVNode = NULL; + SFunctionNode* pConcatFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); + + if (!pConcatFunc) code = TSDB_CODE_OUT_OF_MEMORY; + + if (code == TSDB_CODE_SUCCESS) { + snprintf(pConcatFunc->functionName, TSDB_FUNC_NAME_LEN, "concat"); + code = nodesListMakeStrictAppend(&pConcatFunc->pParameterList, nodesMakeNode(QUERY_NODE_VALUE)); + } + + if (TSDB_CODE_SUCCESS == code) { + pTsmaNameHashVNode = (SValueNode*)nodesListGetNode(pConcatFunc->pParameterList, 0); + pTsmaNameHashVNode->literal = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1); + if (!pTsmaNameHashVNode->literal) code = TSDB_CODE_OUT_OF_MEMORY; + } + if (TSDB_CODE_SUCCESS == code) { + sprintf(pTsmaNameHashVNode->literal, "%s", pReq->name); + int32_t len = taosCreateMD5Hash(pTsmaNameHashVNode->literal, strlen(pTsmaNameHashVNode->literal)); + pTsmaNameHashVNode->literal[len] = '_'; + pTsmaNameHashVNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR; + pTsmaNameHashVNode->node.resType.bytes = strlen(pTsmaNameHashVNode->literal); + } + + if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->recursiveTsma) { + pSubstrFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); + if (!pSubstrFunc) code = TSDB_CODE_OUT_OF_MEMORY; + if (TSDB_CODE_SUCCESS == code) { + snprintf(pSubstrFunc->functionName, TSDB_FUNC_NAME_LEN, "substr"); + code = nodesListMakeStrictAppend(&pSubstrFunc->pParameterList, nodesCloneNode(pTbNameFunc)); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pSubstrFunc->pParameterList, nodesMakeNode(QUERY_NODE_VALUE)); + if (TSDB_CODE_SUCCESS == code) { + SValueNode* pV = (SValueNode*)pSubstrFunc->pParameterList->pTail->pNode; + pV->literal = strdup("34"); + if (!pV->literal) code = TSDB_CODE_OUT_OF_MEMORY; + pV->isDuration = false; + pV->translate = false; + pV->node.resType.type = TSDB_DATA_TYPE_INT; + pV->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes; + } + } + } + if (TSDB_CODE_SUCCESS == code) { + if (pSubstrFunc) { + code = nodesListAppend(pConcatFunc->pParameterList, (SNode*)pSubstrFunc); + } else { + code = nodesListStrictAppend(pConcatFunc->pParameterList, nodesCloneNode(pTbNameFunc)); + } + } + if (TSDB_CODE_SUCCESS == code) { + *pConcatFuncOut = pConcatFunc; + } else { + nodesDestroyNode((SNode*)pSubstrFunc); + nodesDestroyNode((SNode*)pConcatFunc); + } + return code; +} + static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, - STableMeta* pTableMeta) { + const char* tbName, int32_t numOfTags, const SSchema* pTags) { int32_t code = TSDB_CODE_SUCCESS; SSampleAstInfo info = {0}; info.createSmaIndex = true; info.pDbName = pStmt->dbName; - info.pTableName = pStmt->tableName; + info.pTableName = tbName; info.pFuncs = nodesCloneList(pStmt->pOptions->pFuncs); info.pInterval = nodesCloneNode(pStmt->pOptions->pInterval); if (!info.pFuncs || !info.pInterval) code = TSDB_CODE_OUT_OF_MEMORY; @@ -10435,40 +10519,17 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC code = TSDB_CODE_OUT_OF_MEMORY; } if (code == TSDB_CODE_SUCCESS) { - SFunctionNode* pConcatFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); - SValueNode* pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); - if (!pConcatFunc || !pValue) code = TSDB_CODE_OUT_OF_MEMORY; + SFunctionNode* pConcatFunc = NULL; + code = buildTSMAAstMakeConcatFuncNode(pStmt, pReq, (const SNode*)pTbnameFunc, &pConcatFunc); if (code == TSDB_CODE_SUCCESS) { - pValue->literal = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); - if (!pValue->literal) code = TSDB_CODE_OUT_OF_MEMORY; - } - if (code == TSDB_CODE_SUCCESS) { - strcpy(pValue->literal, pReq->name); - int32_t len = taosCreateMD5Hash(pValue->literal, strlen(pValue->literal)); - pValue->literal[len] = '_'; - snprintf(pConcatFunc->functionName, TSDB_FUNC_NAME_LEN, "concat"); - pValue->node.resType.type = TSDB_DATA_TYPE_VARCHAR; - pValue->node.resType.bytes = strlen(pValue->literal); - pValue->isDuration = false; - pValue->isNull = false; - code = nodesListMakeAppend(&pConcatFunc->pParameterList, (SNode*)pValue); - } - if (code == TSDB_CODE_SUCCESS) { - code = nodesListStrictAppend(pConcatFunc->pParameterList, nodesCloneNode((SNode*)pTbnameFunc)); - } - if (code) { - nodesDestroyNode((SNode*)pConcatFunc); - nodesDestroyNode((SNode*)pValue); - } else { info.pSubTable = (SNode*)pConcatFunc; } } } if (TSDB_CODE_SUCCESS == code) { // append partition by tags - int32_t numOfCols = pTableMeta->tableInfo.numOfColumns, numOfTags = pTableMeta->tableInfo.numOfTags; - for (int32_t idx = numOfCols; idx < numOfCols + numOfTags; ++idx) { - SNode* pTagCol = createColumnNodeWithName(pTableMeta->schema[idx].name); + for (int32_t idx = 0; idx < numOfTags; ++idx) { + SNode* pTagCol = createColumnNodeWithName(pTags[idx].name); if (!pTagCol) { code = TSDB_CODE_OUT_OF_MEMORY; break; @@ -10478,7 +10539,7 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC } } - if (code == TSDB_CODE_SUCCESS) + if (code == TSDB_CODE_SUCCESS && !pStmt->pOptions->recursiveTsma) code = fmCreateStateFuncs(info.pFuncs); if (code == TSDB_CODE_SUCCESS) { @@ -10488,88 +10549,50 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC return code; } -static char* defaultTSMAFuncs[4] = {"max", "min", "sum", "count"}; +static int32_t createColumnBySchema(const SSchema* pSchema, SColumnNode** ppCol) { + *ppCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + int32_t code = 0; + if (!*ppCol) return TSDB_CODE_OUT_OF_MEMORY; -static int32_t -translateTSMAFuncs(STranslateContext * pCxt, SCreateTSMAStmt* pStmt, STableMeta* pTableMeta) { - int32_t code = TSDB_CODE_SUCCESS; - if (!pStmt->pOptions->pFuncs) { - // add default funcs for TSMA - for (int32_t i = 0; i < sizeof(defaultTSMAFuncs) / sizeof(char*); ++i) { - SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); - if (!pFunc || TSDB_CODE_SUCCESS != nodesListMakeAppend(&pStmt->pOptions->pFuncs, (SNode*)pFunc)) { - nodesDestroyNode((SNode*)pFunc); - code = TSDB_CODE_OUT_OF_MEMORY; - break; - } - strcpy(pFunc->functionName, defaultTSMAFuncs[i]); - } - // add all numeric cols + (*ppCol)->colId = pSchema->colId; + strcpy((*ppCol)->colName, pSchema->name); + return TSDB_CODE_SUCCESS; +} + +static int32_t rewriteTSMAFuncs(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, int32_t columnNum, + const SSchema* pCols) { + int32_t code = TSDB_CODE_SUCCESS; + SNode* pNode; + SFunctionNode* pFunc = NULL; + SColumnNode* pCol = NULL; + if (pStmt->pOptions->recursiveTsma) { + // recursive tsma, create func list from base tsma + code = fmCreateStateMergeFuncs(pStmt->pOptions->pFuncs); if (TSDB_CODE_SUCCESS == code) { - for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) { - const SSchema* schema = &pTableMeta->schema[i]; - if (!IS_NUMERIC_TYPE(schema->type)) continue; - SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); - if (!pCol || TSDB_CODE_SUCCESS != nodesListMakeAppend(&pStmt->pOptions->pCols, (SNode*)pCol)) { - nodesDestroyNode((SNode*)pCol); - code = TSDB_CODE_OUT_OF_MEMORY; - break; - } - strcpy(pCol->colName, schema->name); - pCol->colId = schema->colId; + int32_t i = 0; + FOREACH(pNode, pStmt->pOptions->pFuncs) { + // rewrite all func parameters with tsma dest tb cols + pFunc = (SFunctionNode*)pNode; + const SSchema* pSchema = pCols + i; + code = createColumnBySchema(pSchema, &pCol); + if (code) break; + nodesListErase(pFunc->pParameterList, pFunc->pParameterList->pHead); + nodesListPushFront(pFunc->pParameterList, (SNode*)pCol); + //TODO what if exceeds the max size + snprintf(pFunc->node.userAlias, TSDB_COL_NAME_LEN, "%s", pSchema->name); + ++i; } } } else { - SNode* pNode; - FOREACH(pNode, pStmt->pOptions->pCols) { - SColumnNode* pCol = (SColumnNode*)pNode; - for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) { - const SSchema* pSchema = &pTableMeta->schema[i]; - if (strcmp(pSchema->name, pCol->colName) == 0) { - pCol->colId = pSchema->colId; - break; - } - } + FOREACH(pNode, pStmt->pOptions->pFuncs) { + pFunc = (SFunctionNode*)pNode; + // TODO test func params with exprs + pCol = (SColumnNode*)pFunc->pParameterList->pHead->pNode; + snprintf(pFunc->node.userAlias, TSDB_COL_NAME_LEN, "%s(%s)", pFunc->functionName, pCol->colName); } } - nodesSortList(&pStmt->pOptions->pCols, sortColWithColId); - SNode* pNode; - FOREACH(pNode, pStmt->pOptions->pFuncs) { - SFunctionNode* pFunc = (SFunctionNode*)pNode; - int32_t funcId = fmGetFuncId(pFunc->functionName); - if (funcId < 0) { - return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION; - } - pFunc->funcId = funcId; - } - nodesSortList(&pStmt->pOptions->pFuncs, sortFuncWithFuncId); if (TSDB_CODE_SUCCESS == code) { - // assemble funcs with columns - SNode *pNode1, *pNode2; - SNodeList* pTSMAFuncs = nodesMakeList(); - if (!pTSMAFuncs) { - code = TSDB_CODE_OUT_OF_MEMORY; - return code; - } - FOREACH(pNode1, pStmt->pOptions->pFuncs) { - FOREACH(pNode2, pStmt->pOptions->pCols) { - SFunctionNode* pFunc = (SFunctionNode*)nodesCloneNode(pNode1); - SColumnNode* pCol = (SColumnNode*)nodesCloneNode(pNode2); - if (!pFunc || !pCol || - (TSDB_CODE_SUCCESS != nodesListMakeAppend(&pFunc->pParameterList, (SNode*)pCol) || (pCol = NULL)) || - TSDB_CODE_SUCCESS != nodesListAppend(pTSMAFuncs, (SNode*)pFunc)) { - nodesDestroyNode((SNode*)pFunc); - nodesDestroyNode((SNode*)pCol); - nodesDestroyList(pTSMAFuncs); - return TSDB_CODE_OUT_OF_MEMORY; - } - // TODO what if exceeds the max size - snprintf(pFunc->node.userAlias, TSDB_COL_NAME_LEN, "%s(%s)", pFunc->functionName, - ((SColumnNode*)pNode2)->colName); - } - } - nodesDestroyList(pStmt->pOptions->pFuncs); - pStmt->pOptions->pFuncs = pTSMAFuncs; + nodesSortList(&pStmt->pOptions->pFuncs, sortFuncWithFuncId); } return code; } @@ -10586,20 +10609,54 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm int32_t code = TSDB_CODE_SUCCESS; STableMeta* pTableMeta = NULL; - code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta); + //TODO 在使用该tableName时, 如果确定其其实是tsma name, 那么避免将此作为tbname进行catalog 获取. + STableTSMAInfo *pRecursiveTsma = NULL; + int32_t numOfCols = 0, numOfTags = 0; + SSchema* pCols = NULL, *pTags = NULL; + if (pStmt->pOptions->recursiveTsma) { + code = getTsma(pCxt, &name, &pRecursiveTsma); + if (code == TSDB_CODE_SUCCESS) { + SNode* pNode; + if (TSDB_CODE_SUCCESS != nodesStringToNode(pRecursiveTsma->ast, &pNode)) { + return TSDB_CODE_TSMA_INVALID_STAT; + } + SSelectStmt* pSelect = (SSelectStmt*)pNode; + FOREACH(pNode, pSelect->pProjectionList) { + SFunctionNode* pFuncNode = (SFunctionNode*)pNode; + if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue; + nodesListMakeStrictAppend(&pStmt->pOptions->pFuncs, nodesCloneNode(pNode)); + } + nodesDestroyNode((SNode*)pSelect); + } + memset(&name, 0, sizeof(SName)); + tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pRecursiveTsma->targetTb, &name), pReq->stb); + numOfCols = pRecursiveTsma->pUsedCols->size; // TODO merge pUsedCols and pTags with one SSchema array + numOfTags = pRecursiveTsma->pTags->size; + pCols = pRecursiveTsma->pUsedCols->pData; + pTags = pRecursiveTsma->pTags->pData; + } else { + code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta); + if (TSDB_CODE_SUCCESS == code) { + numOfCols = pTableMeta->tableInfo.numOfColumns; + numOfTags = pTableMeta->tableInfo.numOfTags; + pCols = pTableMeta->schema; + pTags = pTableMeta->schema + numOfCols; + } + } if (TSDB_CODE_SUCCESS == code) { - code = getSmaIndexDstVgId(pCxt, pStmt->dbName, pStmt->tableName, &pReq->dstVgId); + //code = getSmaIndexDstVgId(pCxt, pStmt->dbName, pStmt->tableName, &pReq->dstVgId); } if (TSDB_CODE_SUCCESS == code) { code = getSmaIndexSql(pCxt, &pReq->sql, &pReq->sqlLen); } if (TSDB_CODE_SUCCESS == code) { - code = translateTSMAFuncs(pCxt, pStmt, pTableMeta); + code = rewriteTSMAFuncs(pCxt, pStmt, numOfCols, pCols); } if (TSDB_CODE_SUCCESS == code) { - code = buildTSMAAst(pCxt, pStmt, pReq, pTableMeta); + code = buildTSMAAst(pCxt, pStmt, pReq, pStmt->pOptions->recursiveTsma ? pRecursiveTsma->targetTb : pStmt->tableName, + numOfTags, pTags); } /* if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index e5a6d0dead..fe0010859a 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -16,6 +16,8 @@ #include "parUtil.h" #include "cJSON.h" #include "querynodes.h" +#include "tarray.h" +#include "tlog.h" #define USER_AUTH_KEY_MAX_LEN TSDB_USER_LEN + TSDB_TABLE_FNAME_LEN + 2 @@ -726,6 +728,9 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog if (TSDB_CODE_SUCCESS == code) { code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pTableTSMAs); } + if (TSDB_CODE_SUCCESS == code) { + code = buildTableReqFromDb(pMetaCache->pTSMAs, &pCatalogReq->pTSMAs); + } #ifdef TD_ENTERPRISE if (TSDB_CODE_SUCCESS == code) { code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pView); @@ -870,6 +875,9 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet if (TSDB_CODE_SUCCESS == code) { code = putDbTableDataToCache(pCatalogReq->pTableTSMAs, pMetaData->pTableTsmas, &pMetaCache->pTableTSMAs); } + if (TSDB_CODE_SUCCESS == code) { + code = putDbTableDataToCache(pCatalogReq->pTSMAs, pMetaData->pTsmas, &pMetaCache->pTSMAs); + } #ifdef TD_ENTERPRISE if (TSDB_CODE_SUCCESS == code) { code = putDbTableDataToCache(pCatalogReq->pView, pMetaData->pView, &pMetaCache->pViews); @@ -1144,6 +1152,10 @@ int32_t reserveTableTSMAInfoInCache(int32_t acctId, const char *pDb, const char return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableTSMAs); } +int32_t reserveTSMAInfoInCache(int32_t acctId, const char* pDb, const char* pTsmaName, SParseMetaCache* pMetaCache) { + return reserveTableReqInDbCache(acctId, pDb, pTsmaName, &pMetaCache->pTSMAs); +} + int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes) { char fullName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pName, fullName); @@ -1161,15 +1173,26 @@ int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, int32_t getTableTsmasFromCache(SParseMetaCache* pMetaCache, const SName* pTbName, SArray** pTsmas) { char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTbName, tbFName); - STableTSMAInfoRsp *pTsmasRsp = NULL; - STableTSMAInfo* pTsma = NULL; - int32_t code = getMetaDataFromHash(tbFName, strlen(tbFName), pMetaCache->pTableTSMAs, (void**)&pTsmasRsp); + STableTSMAInfoRsp* pTsmasRsp = NULL; + int32_t code = getMetaDataFromHash(tbFName, strlen(tbFName), pMetaCache->pTableTSMAs, (void**)&pTsmasRsp); if (TSDB_CODE_SUCCESS == code && pTsmasRsp) { *pTsmas = pTsmasRsp->pTsmas; } return TSDB_CODE_SUCCESS; } +int32_t getTsmaFromCache(SParseMetaCache* pMetaCache, const SName* pTsmaName, STableTSMAInfo** pTsma) { + char tsmaFName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(pTsmaName, tsmaFName); + STableTSMAInfoRsp* pTsmaRsp = NULL; + int32_t code = getMetaDataFromHash(tsmaFName, strlen(tsmaFName), pMetaCache->pTSMAs, (void**)&pTsmaRsp); + if (TSDB_CODE_SUCCESS == code && pTsmaRsp) { + ASSERT(pTsmaRsp->pTsmas->size == 1); + *pTsma = taosArrayGetP(pTsmaRsp->pTsmas, 0); + } + return code; +} + STableCfg* tableCfgDup(STableCfg* pCfg) { STableCfg* pNew = taosMemoryMalloc(sizeof(*pNew)); @@ -1242,10 +1265,12 @@ void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) { destoryParseTablesMetaReqHash(pMetaCache->pTableMeta); destoryParseTablesMetaReqHash(pMetaCache->pTableVgroup); destoryParseTablesMetaReqHash(pMetaCache->pViews); + destoryParseTablesMetaReqHash(pMetaCache->pTSMAs); } else { taosHashCleanup(pMetaCache->pTableMeta); taosHashCleanup(pMetaCache->pTableVgroup); taosHashCleanup(pMetaCache->pViews); + taosHashCleanup(pMetaCache->pTSMAs); } taosHashCleanup(pMetaCache->pDbVgroup); taosHashCleanup(pMetaCache->pDbCfg); diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index c5b657910e..4ba61f5353 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -271,6 +271,7 @@ void destoryCatalogReq(SCatalogReq *pCatalogReq) { taosArrayDestroy(pCatalogReq->pView); #endif taosArrayDestroy(pCatalogReq->pTableTSMAs); + taosArrayDestroy(pCatalogReq->pTSMAs); } else { taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq); taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq); @@ -278,6 +279,7 @@ void destoryCatalogReq(SCatalogReq *pCatalogReq) { taosArrayDestroyEx(pCatalogReq->pView, destoryTablesReq); #endif taosArrayDestroyEx(pCatalogReq->pTableTSMAs, destoryTablesReq); + taosArrayDestroyEx(pCatalogReq->pTSMAs, destoryTablesReq); } taosArrayDestroy(pCatalogReq->pUdf); taosArrayDestroy(pCatalogReq->pIndex); diff --git a/source/libs/parser/src/sql.c b/source/libs/parser/src/sql.c index b0e6ef024c..81d82cbe14 100644 --- a/source/libs/parser/src/sql.c +++ b/source/libs/parser/src/sql.c @@ -1712,6 +1712,7 @@ static const YYCODETYPE yyFallback[] = { 0, /* SYSTEM => nothing */ 0, /* TSMA => nothing */ 0, /* INTERVAL => nothing */ + 0, /* RECURSIVE => nothing */ 0, /* TSMAS => nothing */ 0, /* FUNCTION => nothing */ 0, /* INDEX => nothing */ diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 6e7a6705bc..ec1bc2c9d6 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -6168,7 +6168,7 @@ static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbN // TODO test child tbname too long // if with tsma, we replace func tbname with substr(tbname, 34) pRewrittenFunc->funcId = fmGetFuncId("substr"); - snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "substr(tbname, 34)"); + snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "substr"); pValue->node.resType.type = TSDB_DATA_TYPE_INT; pValue->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes; pValue->literal = taosMemoryCalloc(1, 16); @@ -6184,7 +6184,7 @@ static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbN } else if (code == TSDB_CODE_SUCCESS) { // if no tsma, we replace func tbname with concat('', tbname) pRewrittenFunc->funcId = fmGetFuncId("concat"); - snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "concat('', tbname)"); + snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "concat"); pValue->node.resType = ((SExprNode*)(*pTbNameNode))->resType; pValue->literal = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1); @@ -6387,6 +6387,7 @@ static int32_t tsmaOptRevisePlan2(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, pColNode = (SColumnNode*)pScanListCell->pNode; pScanListCell = pScanListCell->pNext; pColNode->node.resType = pPartial->node.resType; + // currently we assume that the first parameter must be the scan column nodesListErase(pMerge->pParameterList, pMerge->pParameterList->pHead); // TODO STRICT nodesListPushFront(pMerge->pParameterList, nodesCloneNode((SNode*)pColNode)); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 0051753b07..8cb2bb19d2 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -322,6 +322,25 @@ int32_t queryBuildGetTableTSMAMsg(void *input, char **msg, int32_t msgSize, int3 return TSDB_CODE_SUCCESS; } +int32_t queryBuildGetTSMAMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, + void *(*mallcFp)(int64_t)) { + if (NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + STableTSMAInfoReq req = {0}; + req.fetchingTsma = true; + strncpy(req.name, input, sizeof(req.name) - 1); + + int32_t bufLen = tSerializeTableTSMAInfoReq(NULL, 0, &req); + void * pBuf = (*mallcFp)(bufLen); + tSerializeTableTSMAInfoReq(pBuf, bufLen, &req); + + *msg = pBuf; + *msgLen = bufLen; + return TSDB_CODE_SUCCESS; +} + int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { SUseDbOutput *pOut = output; SUseDbRsp usedbRsp = {0}; @@ -731,6 +750,7 @@ void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryBuildGetSerVerMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_VIEW_META)] = queryBuildGetViewMetaMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_TSMA)] = queryBuildGetTableTSMAMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TSMA)] = queryBuildGetTSMAMsg; queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; @@ -747,6 +767,7 @@ void initQueryModuleMsgHandle() { queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryProcessGetSerVerRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_VIEW_META)] = queryProcessGetViewMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_TSMA)] = queryProcessGetTbTSMARsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TSMA)] = queryProcessGetTbTSMARsp; } #pragma GCC diagnostic pop diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index 4b70a9eddd..9fee3ddd4a 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -1,3 +1,4 @@ +from os import name from random import randrange import taos import time @@ -23,6 +24,7 @@ class TSMA: class UsedTsma: TS_MIN = '-9223372036854775808' TS_MAX = '9223372036854775806' + TSMA_RES_STB_POSTFIX = '_tsma_res_stb_' def __init__(self) -> None: self.name = '' ## tsma name or table name @@ -47,6 +49,9 @@ class UsedTsma: def __repr__(self) -> str: return self.__str__() + + def setIsTsma(self): + self.is_tsma_ = self.name.endswith(self.TSMA_RES_STB_POSTFIX) class TSMAQueryContext: def __init__(self) -> None: @@ -105,7 +110,7 @@ class TSMAQueryContextBuilder: def should_query_with_tsma(self, tsma_name: str, ts_begin: str, ts_end: str) -> 'TSMAQueryContextBuilder': used_tsma: UsedTsma = UsedTsma() - used_tsma.name = tsma_name + '_tsma_res_stb' + used_tsma.name = tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX used_tsma.time_range_start = self.to_timestamp(ts_begin) used_tsma.time_range_end = self.to_timestamp(ts_end) used_tsma.is_tsma_ = True @@ -139,8 +144,7 @@ class TSMATestContext: if idx >= 0: words = row[idx:].split(' ') used_tsma.name = words[3] - if used_tsma.name.endswith('tsma_res_stb'): - used_tsma.is_tsma_ = True + used_tsma.setIsTsma() else: idx = row.find('Time Range:') if idx >= 0: @@ -170,7 +174,7 @@ class TSMATestContext: tdLog.exit('check explain failed for sql: %s \nexpect: %s \nactual: %s' % (sql, str(expect), str(query_ctx))) def check_result(self, sql: str): - #tdSql.execute("alter local 'querySmaOptimize' '1'") + tdSql.execute("alter local 'querySmaOptimize' '1'") tsma_res = tdSql.getResult(sql) tdSql.execute("alter local 'querySmaOptimize' '0'") @@ -185,6 +189,7 @@ class TSMATestContext: for row_no_tsma, row_tsma in zip(no_tsma_res, tsma_res): if row_no_tsma != row_tsma: tdLog.exit("comparing tsma res for: %s got different row data: no tsma row: %s, tsma row: %s" % (sql, str(row_no_tsma), str(row_tsma))) + tdLog.info('result check succeed for sql: %s. \n tsma-res: %s. \nno_tsma-res: %s' % (sql, str(tsma_res), str(no_tsma_res))) def check_sql(self, sql: str, expect: TSMAQueryContext): self.check_explain(sql, expect=expect) @@ -298,13 +303,18 @@ class TDTestCase: startTs=paraDict["startTs"],tsStep=paraDict["tsStep"]) return - def create_tsma(self, tsma_name: str, db: str, tb: str, func_list: list, col_list: list, interval: str): + def create_tsma(self, tsma_name: str, db: str, tb: str, func_list: list, interval: str): tdSql.execute('use %s' % db) - sql = "create tsma %s on %s.%s function(%s) column(%s) interval(%s)" % (tsma_name, db, tb, ','.join(func_list), ','.join(col_list), interval) + sql = "CREATE TSMA %s ON %s.%s FUNCTION(%s) INTERVAL(%s)" % (tsma_name, db, tb, ','.join(func_list), interval) + tdSql.execute(sql, queryTimes=1) + + def create_recursive_tsma(self, base_tsma_name: str, new_tsma_name: str, db: str, interval: str): + tdSql.execute('use %s' % db, queryTimes=1) + sql = 'CREATE RECURSIVE TSMA %s ON %s.%s INTERVAL(%s)' % (new_tsma_name, db, base_tsma_name, interval) tdSql.execute(sql, queryTimes=1) def drop_tsma(self, tsma_name: str, db: str): - sql = 'drop tsma %s.%s' % (db, tsma_name) + sql = 'DROP TSMA %s.%s' % (db, tsma_name) tdSql.execute(sql, queryTimes=1) def check_explain_res_has_row(self, plan_str_expect: str, explain_output): @@ -323,10 +333,13 @@ class TDTestCase: def test_query_with_tsma(self): self.init_data() - self.create_tsma('tsma1', 'test', 'meters', ['avg'], ['c1', 'c2'], '5m') - self.create_tsma('tsma2', 'test', 'meters', ['avg'], ['c1', 'c2'], '30m') + self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m') + self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m') + self.create_recursive_tsma('tsma1', 'tsma3', 'test', '20m') + self.create_recursive_tsma('tsma2', 'tsma4', 'test', '1h') + ## why need 5s, calculation not finished yet. time.sleep(5) - time.sleep(9999999) + #time.sleep(9999999) self.test_query_with_tsma_interval() self.test_query_with_tsma_agg()