support recursive tsma creation

This commit is contained in:
wangjiaming0909 2024-01-31 14:20:49 +08:00
parent 1509ce04cf
commit 70ca830378
31 changed files with 630 additions and 185 deletions

View File

@ -414,6 +414,8 @@ typedef struct STUidTagInfo {
int32_t taosGenCrashJsonMsg(int signum, char **pMsg, int64_t clusterId, int64_t startTime);
#define TSMA_RES_STB_POSTFIX "_tsma_res_stb_"
#ifdef __cplusplus
}
#endif

View File

@ -4192,7 +4192,8 @@ int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pR
int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp);
void tFreeSViewMetaRsp(SViewMetaRsp* pRsp);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char name[TSDB_TABLE_FNAME_LEN]; // table name or tsma name
bool fetchingTsma; // if we are fetching with tsma name
}STableTSMAInfoReq;
int32_t tSerializeTableTSMAInfoReq(void* buf, int32_t bufLen, const STableTSMAInfoReq* pReq);
@ -4218,6 +4219,8 @@ typedef struct {
int8_t unit;
SArray* pFuncs; // SArray<STableTSMAFuncInfo>
SArray* pTags; // SArray<SSchema>
SArray* pUsedCols; // SArray<SSchema>
char* ast;
} STableTSMAInfo;
typedef struct {

View File

@ -227,6 +227,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TSMA, "drop-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STB_DROP, "drop-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_TSMA, "get-table-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_TSMA, "get-tsma", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8

View File

@ -95,6 +95,7 @@ typedef struct SCatalogReq {
SArray* pTableTag; // element is SNAME
SArray* pView; // element is STablesReq
SArray* pTableTSMAs; // element is STablesReq
SArray* pTSMAs; // element is STablesReq
bool qNodeRequired; // valid qnode
bool dNodeRequired; // valid dnode
bool svrVerRequired;
@ -124,6 +125,7 @@ typedef struct SMetaData {
SArray* pDnodeList; // pRes = SArray<SEpSet>*
SArray* pView; // pRes = SViewMeta*
SArray* pTableTsmas; // pRes = SArray<STableTSMAInfo*>
SArray* pTsmas; // pRes = SArray<STableTSMAInfo*>
SMetaRes* pSvrVer; // pRes = char*
} SMetaData;
@ -412,6 +414,8 @@ int32_t catalogRemoveTSMA(SCatalog* pCtg, const STableTSMAInfo* pTsma);
int32_t catalogGetTableTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes);
int32_t catalogGetTsma(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTsmaName, STableTSMAInfo** pTsma);
/**
* Destroy catalog and relase all resources
*/

View File

@ -596,6 +596,7 @@ typedef struct STSMAOptions {
SNodeList* pCols;
SNode* pInterval;
uint8_t tsPrecision;
bool recursiveTsma; // true if create recursive tsma
} STSMAOptions;
typedef struct SCreateTSMAStmt {
@ -603,7 +604,7 @@ typedef struct SCreateTSMAStmt {
bool ignoreExists;
char tsmaName[TSDB_INDEX_NAME_LEN];
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN]; // base tb name or base tsma name
STSMAOptions* pOptions;
//SMCreateSmaReq* pReq;
} SCreateTSMAStmt;

View File

@ -937,6 +937,7 @@ int32_t cloneCatalogReq(SCatalogReq **ppTarget, SCatalogReq *pSrc) {
pTarget->pTableTag = taosArrayDup(pSrc->pTableTag, NULL);
pTarget->pView = taosArrayDup(pSrc->pView, NULL);
pTarget->pTableTSMAs = taosArrayDup(pSrc->pTableTSMAs, NULL);
pTarget->pTSMAs = taosArrayDup(pSrc->pTSMAs, NULL);
pTarget->qNodeRequired = pSrc->qNodeRequired;
pTarget->dNodeRequired = pSrc->dNodeRequired;
pTarget->svrVerRequired = pSrc->svrVerRequired;

View File

@ -9943,6 +9943,7 @@ int32_t tSerializeTableTSMAInfoReq(void* buf, int32_t bufLen, const STableTSMAIn
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->fetchingTsma) < 0) return -1;
tEndEncode(&encoder);
@ -9957,6 +9958,7 @@ int32_t tDeserializeTableTSMAInfoReq(void* buf, int32_t bufLen, STableTSMAInfoRe
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, (uint8_t*)&pReq->fetchingTsma) < 0) return -1;
tEndDecode(&decoder);
@ -9992,6 +9994,14 @@ static int32_t tEncodeTableTSMAInfo(SEncoder* pEncoder, const STableTSMAInfo* pT
const SSchema* pSchema = taosArrayGet(pTsmaInfo->pTags, i);
if (tEncodeSSchema(pEncoder, pSchema) < 0) return -1;
}
size = pTsmaInfo->pUsedCols ? pTsmaInfo->pUsedCols->size : 0;
if (tEncodeI32(pEncoder, size) < 0) return -1;
for (int32_t i = 0; i < size; ++i) {
const SSchema* pSchema = taosArrayGet(pTsmaInfo->pUsedCols, i);
if (tEncodeSSchema(pEncoder, pSchema) < 0) return -1;
}
if (tEncodeCStr(pEncoder, pTsmaInfo->ast) < 0) return -1;
return 0;
}
@ -10033,6 +10043,18 @@ static int32_t tDecodeTableTSMAInfo(SDecoder* pDecoder, STableTSMAInfo* pTsmaInf
}
}
if (tDecodeI32(pDecoder, &size) < 0) return -1;
if (size > 0) {
pTsmaInfo->pUsedCols = taosArrayInit(size, sizeof(SSchema));
if (!pTsmaInfo->pUsedCols) return -1;
for (int32_t i = 0; i < size; ++i) {
SSchema schema = {0};
if (tDecodeSSchema(pDecoder, &schema) < 0) return -1;
taosArrayPush(pTsmaInfo->pUsedCols, &schema);
}
}
if (tDecodeCStrAlloc(pDecoder, &pTsmaInfo->ast) < 0) return -1;
return 0;
}
@ -10094,6 +10116,8 @@ void tFreeTableTSMAInfo(void* p) {
if (pTsmaInfo) {
taosArrayDestroy(pTsmaInfo->pFuncs);
taosArrayDestroy(pTsmaInfo->pTags);
taosArrayDestroy(pTsmaInfo->pUsedCols);
taosMemoryFree(pTsmaInfo->ast);
}
}
@ -10116,10 +10140,21 @@ int32_t tCloneTbTSMAInfo(STableTSMAInfo* pInfo, STableTSMAInfo** pRes) {
*pRet = *pInfo;
if (pInfo->pFuncs) {
pRet->pFuncs = taosArrayDup(pInfo->pFuncs, NULL);
if (!pRet->pFuncs) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (pInfo->pTags) {
if (pInfo->pTags && code == TSDB_CODE_SUCCESS) {
pRet->pTags = taosArrayDup(pInfo->pTags, NULL);
if (!pRet->pTags) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (pInfo->pUsedCols && code == TSDB_CODE_SUCCESS) {
pRet->pUsedCols = taosArrayDup(pInfo->pUsedCols, NULL);
if (!pRet->pUsedCols) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (pInfo->ast && code == TSDB_CODE_SUCCESS) {
pRet->ast = taosStrdup(pInfo->ast);
if (!pRet->ast) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (code) tFreeTableTSMAInfo(pRet);
*pRes = pRet;
return code;
}

View File

@ -101,6 +101,7 @@ int32_t mndInitSma(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TSMA, mndProcessCreateTSMAReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TSMA, mndProcessDropTSMAReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_TSMA, mndProcessGetTbTSMAReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_TSMA, mndProcessGetTbTSMAReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndRetrieveTSMA);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndCancelRetrieveTSMA);
@ -1577,7 +1578,7 @@ static void mndTSMAGenerateOutputName(const char* tsmaName, char* streamName, ch
SName smaName;
tNameFromString(&smaName, tsmaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname);
snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s_tsma_res_stb", tsmaName);
snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s"TSMA_RES_STB_POSTFIX, tsmaName);
}
static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
@ -1964,17 +1965,67 @@ int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STa
}
nodesDestroyNode(pNode);
}
if (code == TSDB_CODE_SUCCESS) {
if (pDestStb->numOfTags > 0) {
pInfo->ast = taosStrdup(pSma->ast);
if (!pInfo->ast) code = TSDB_CODE_OUT_OF_MEMORY;
if (code == TSDB_CODE_SUCCESS && pDestStb->numOfTags > 0) {
pInfo->pTags = taosArrayInit(pDestStb->numOfTags, sizeof(SSchema));
if (!pInfo->pTags) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
for (int32_t i = 0; i < pDestStb->numOfTags; ++i) {
taosArrayPush(pInfo->pTags, &pDestStb->pTags[i]);
}
}
}
if (code == TSDB_CODE_SUCCESS) {
pInfo->pUsedCols = taosArrayInit(pDestStb->numOfColumns - 3, sizeof(SSchema));
if (!pInfo->pUsedCols)
code = TSDB_CODE_OUT_OF_MEMORY;
else {
// skip _wstart, _wend, _duration
for (int32_t i = 1; i < pDestStb->numOfColumns - 2; ++i) {
taosArrayPush(pInfo->pUsedCols, &pDestStb->pColumns[i]);
}
}
}
return code;
}
static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) {
int32_t code = -1;
SSmaObj *pSma = NULL;
SStbObj *pDstStb = NULL;
pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, tsmaFName);
if (pSma) {
pDstStb = mndAcquireStb(pMnode, pSma->dstTbName);
if (!pDstStb) {
sdbRelease(pMnode->pSdb, pSma);
return TSDB_CODE_SUCCESS;
}
STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
if (!pTsma) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pMnode->pSdb, pSma);
mndReleaseStb(pMnode, pDstStb);
return code;
}
terrno = dumpTSMAInfoFromSmaObj(pSma, pDstStb, pTsma);
mndReleaseStb(pMnode, pDstStb);
sdbRelease(pMnode->pSdb, pSma);
if (terrno) return code;
if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tFreeTableTSMAInfo(pTsma);
}
*exist = true;
}
return 0;
}
static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *rsp, bool *exist) {
int32_t code = -1;
SSmaObj * pSma = NULL;
@ -2011,19 +2062,14 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp
}
terrno = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma);
mndReleaseStb(pMnode, pStb);
if (terrno) {
sdbRelease(pSdb, pSma);
return code;
}
if (terrno) return code;
if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tFreeTableTSMAInfo(pTsma);
sdbRelease(pSdb, pSma);
return code;
}
*exist = true;
sdbRelease(pSdb, pSma);
}
return TSDB_CODE_SUCCESS;
}
@ -2047,7 +2093,11 @@ static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
goto _OVER;
}
if (tsmaReq.fetchingTsma) {
code = mndGetTSMA(pMnode, tsmaReq.name, &rsp, &exist);
} else {
code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist);
}
if (code) {
goto _OVER;
}
@ -2092,6 +2142,7 @@ static int32_t mkNonExistTSMAInfo(const STSMAVersion *pTsmaVer, STableTSMAInfo *
tstrncpy(pInfo->tb, pTsmaVer->tbName, TSDB_TABLE_NAME_LEN);
tstrncpy(pInfo->name, pTsmaVer->name, TSDB_TABLE_NAME_LEN);
pInfo->dbId = pTsmaVer->dbId;
pInfo->ast = "dummy";// TODO could be freed
*ppTsma = pInfo;
return TSDB_CODE_SUCCESS;
}

View File

@ -129,6 +129,7 @@ typedef enum {
CTG_TASK_GET_TB_TAG,
CTG_TASK_GET_VIEW,
CTG_TASK_GET_TB_TSMA,
CTG_TASK_GET_TSMA,
} CTG_TASK_TYPE;
typedef enum {
@ -391,6 +392,7 @@ typedef struct SCtgJob {
int32_t svrVerNum;
int32_t viewNum;
int32_t tbTsmaNum;
int32_t tsmaNum; // currently, only 1 is possible
} SCtgJob;
typedef struct SCtgMsgCtx {
@ -1116,8 +1118,9 @@ int32_t ctgGetUserCb(SCtgTask* pTask);
int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx, int32_t* fetchIdx, int32_t baseResIdx,
SArray* pList);
int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* name, STableTSMAInfoRsp* out,
SCtgTaskReq* tReq);
int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaName);
int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* name, STableTSMAInfoRsp* out,
SCtgTaskReq* tReq, int32_t reqType);
int32_t ctgUpdateTbTSMAEnqueue(SCatalog* pCtg, STSMACache** pTsma, bool syncOp);
int32_t ctgDropTSMAForTbEnqueue(SCatalog* pCtg, SName* pName, bool syncOp);
int32_t ctgDropTbTSMAEnqueue(SCatalog* pCtg, const STSMACache* pTsma, bool syncOp);

View File

@ -1832,7 +1832,8 @@ _return:
int32_t ctgGetTbTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, SArray** ppRes) {
STableTSMAInfoRsp tsmasRsp = {0};
int32_t code = ctgGetTbTSMAFromMnode(pCtg, pConn, pTableName, &tsmasRsp, NULL);
//TODO get from cache first
int32_t code = ctgGetTbTSMAFromMnode(pCtg, pConn, pTableName, &tsmasRsp, NULL, TDMT_MND_GET_TABLE_TSMA);
if (code == TSDB_CODE_MND_SMA_NOT_EXIST) {
code = 0;
goto _return;
@ -1870,6 +1871,41 @@ _return:
CTG_API_LEAVE(code);
}
int32_t ctgGetTsma(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTsmaName, STableTSMAInfo** pTsma) {
STableTSMAInfoRsp tsmaRsp = {0};
int32_t code = ctgGetTbTSMAFromMnode(pCtg, pConn, pTsmaName, &tsmaRsp, NULL, TDMT_MND_GET_TSMA);
if (code == TSDB_CODE_MND_SMA_NOT_EXIST) {
code = 0;
goto _return;
}
CTG_ERR_JRET(code);
ASSERT(tsmaRsp.pTsmas && tsmaRsp.pTsmas->size == 1);
*pTsma = taosArrayGetP(tsmaRsp.pTsmas, 0);
taosArrayDestroy(tsmaRsp.pTsmas);
_return:
if (tsmaRsp.pTsmas) {
tFreeTableTSMAInfoRsp(&tsmaRsp);
}
CTG_RET(code);
}
int32_t catalogGetTsma(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTsmaName, STableTSMAInfo** pTsma) {
CTG_API_ENTER();
if (!pCtg || !pConn || !pTsmaName) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t code = 0;
CTG_ERR_JRET(ctgGetTsma(pCtg, pConn, pTsmaName, pTsma));
_return:
CTG_API_LEAVE(code);
}
int32_t catalogClearCache(void) {
CTG_API_ENTER_NOLOCK();

View File

@ -492,6 +492,22 @@ int32_t ctgInitGetTbTSMATask(SCtgJob* pJob, int32_t taskId, void* param) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetTSMATask(SCtgJob* pJob, int32_t taskId, void* param) {
SCtgTask task = {0};
task.type = CTG_TASK_GET_TSMA;
task.taskId = taskId;
task.pJob = pJob;
SCtgTbTSMACtx* pTaskCtx = taosMemoryCalloc(1, sizeof(SCtgTbTSMACtx));
if (!pTaskCtx) CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
task.taskCtx = pTaskCtx;
pTaskCtx->pNames = param;
pTaskCtx->pResList = taosArrayInit(pJob->tsmaNum, sizeof(SMetaRes));
taosArrayPush(pJob->pTasks, &task);
return 0;
}
int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, const SCatalogReq* pReq) {
SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
SHashObj* pTb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
@ -589,6 +605,7 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, con
}
for (int32_t i = 0; i < pJob->tbTsmaNum; ++i) {
// TODO test for it
SName* name = taosArrayGet(pReq->pTableTSMAs, i);
ctgDropTSMAForTbEnqueue(pCtg, name, true);
}
@ -635,6 +652,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
int32_t tbTagNum = (int32_t)taosArrayGetSize(pReq->pTableTag);
int32_t viewNum = (int32_t)ctgGetTablesReqNum(pReq->pView);
int32_t tbTsmaNum = (int32_t)taosArrayGetSize(pReq->pTableTSMAs);
int32_t tsmaNum = (int32_t)taosArrayGetSize(pReq->pTSMAs);
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum +
userNum + dbInfoNum + tbIndexNum + tbCfgNum + tbTagNum + viewNum + tbTsmaNum;
@ -671,6 +689,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
pJob->tbTagNum = tbTagNum;
pJob->viewNum = viewNum;
pJob->tbTsmaNum = tbTsmaNum;
pJob->tsmaNum = tsmaNum;
#if CTG_BATCH_FETCH
pJob->pBatchs =
@ -766,6 +785,9 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
if (tbTsmaNum > 0) {
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_TSMA, pReq->pTableTSMAs, NULL));
}
if (tsmaNum > 0) {
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TSMA, pReq->pTSMAs, NULL));
}
if (qnodeNum) {
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_QNODE, NULL, NULL));
@ -2612,7 +2634,7 @@ int32_t ctgLaunchGetTbTSMATask(SCtgTask* pTask) {
for (int32_t idx = 0; idx < dbNum; ++idx) {
STablesReq* pReq = taosArrayGet(pCtx->pNames, idx);
ctgGetTbTSMAFromCache(pCtg, pCtx, idx, &fetchIdx, baseResIdx, pReq->pTables);
CTG_ERR_RET(ctgGetTbTSMAFromCache(pCtg, pCtx, idx, &fetchIdx, baseResIdx, pReq->pTables));
baseResIdx += taosArrayGetSize(pReq->pTables);
}
pCtx->fetchNum = taosArrayGetSize(pCtx->pFetches);
@ -2632,12 +2654,100 @@ int32_t ctgLaunchGetTbTSMATask(SCtgTask* pTask) {
SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = pFetch->fetchIdx;
CTG_ERR_RET(ctgGetTbTSMAFromMnode(pCtg, pConn, pName, NULL, &tReq));
CTG_ERR_RET(ctgGetTbTSMAFromMnode(pCtg, pConn, pName, NULL, &tReq, TDMT_MND_GET_TABLE_TSMA));
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgLaunchGetTSMATask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
SCtgTbTSMACtx* pCtx = (SCtgTbTSMACtx*)pTask->taskCtx;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SArray* pRes = NULL;
SCtgJob* pJob = pTask->pJob;
// currently, only support fetching one tsma
ASSERT(pCtx->pNames->size == 1);
STablesReq* pReq = taosArrayGet(pCtx->pNames, 0);
ASSERT(pReq->pTables->size == 1);
SName* pTsmaName = taosArrayGet(pReq->pTables, 0);
CTG_ERR_RET(ctgGetTSMAFromCache(pCtg, pCtx, pTsmaName));
if (pCtx->pResList->size == 0) {
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, 0);
if (!pMsgCtx->pBatchs) pMsgCtx->pBatchs = pJob->pBatchs;
SCtgTaskReq tReq = {.pTask = pTask, .msgIdx = 0};
taosArrayPush(pCtx->pResList, &(SMetaRes){0});
CTG_ERR_RET(ctgGetTbTSMAFromMnode(pCtg, pConn, pTsmaName, NULL, &tReq, TDMT_MND_GET_TSMA));
}
return 0;
}
int32_t ctgHandleGetTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
SCtgTbTSMACtx* pCtx = pTask->taskCtx;
SMetaRes* pRes = taosArrayGet(pCtx->pResList, 0);
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, 0);
SName* pName = taosArrayGet(pTbReq->pTables, 0);
SRequestConnInfo* pConn = &pTask->pJob->conn;
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
switch (reqType) {
case TDMT_MND_TABLE_META: {
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
if (!CTG_IS_META_NULL(pOut->metaType)) {
CTG_ERR_JRET(ctgUpdateTbMetaToCache(pCtg, pOut, CTG_FLAG_SYNC_OP));
}
} break;
case TDMT_MND_GET_TSMA: {
STableTSMAInfoRsp* pOut = pMsgCtx->out;
pRes->code = 0;
if (pOut->pTsmas->size > 0) {
ASSERT(pOut->pTsmas->size == 1);
pRes->pRes = pOut;
pMsgCtx->out = NULL;
TSWAP(pTask->res, pCtx->pResList);
break;
STableTSMAInfo* pTsma = taosArrayGetP(pOut->pTsmas, 0);
SName dstTbName = *pName;
strcpy(dstTbName.tname, pTsma->targetTb);
SCtgTbMetaCtx stbCtx = {0};
stbCtx.flag = CTG_FLAG_STB;
stbCtx.pName = &dstTbName;
STableMeta* pDstTbMeta = NULL;
(void)ctgReadTbMetaFromCache(pCtg, &stbCtx, &pDstTbMeta);
if (!pDstTbMeta) {
TSWAP(pMsgCtx->lastOut, pMsgCtx->out);
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pTsma->targetDbFName, dstTbName.tname, NULL, tReq));
} else {
taosMemoryFreeClear(pDstTbMeta);
}
}
} break;
default:
ASSERT(0);
}
_return:
if (code) {
if (TSDB_CODE_MND_SMA_NOT_EXIST == code) {
code = TSDB_CODE_SUCCESS;
} else {
ctgTaskError("Get tsma for %d.%s.%s failed with err: %s", pName->acctId, pName->dbname, pName->tname,
tstrerror(code));
}
}
ctgHandleTaskEnd(pTask, code);
CTG_RET(code);
}
int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
@ -2708,6 +2818,16 @@ int32_t ctgDumpTbTSMARes(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpTSMARes(SCtgTask* pTask) {
if (pTask->subTask) {
return TSDB_CODE_SUCCESS;
}
SCtgJob* pJob = pTask->pJob;
pJob->jobRes.pTsmas = pTask->res;
return TSDB_CODE_SUCCESS;
}
int32_t ctgRelaunchGetTbMetaTask(SCtgTask* pTask) {
ctgResetTbMetaTask(pTask);
@ -2826,6 +2946,7 @@ SCtgAsyncFps gCtgAsyncFps[] = {
{ctgInitGetTbTagTask, ctgLaunchGetTbTagTask, ctgHandleGetTbTagRsp, ctgDumpTbTagRes, NULL, NULL},
{ctgInitGetViewsTask, ctgLaunchGetViewsTask, ctgHandleGetViewsRsp, ctgDumpViewsRes, NULL, NULL},
{ctgInitGetTbTSMATask, ctgLaunchGetTbTSMATask, ctgHandleGetTbTSMARsp, ctgDumpTbTSMARes, NULL, NULL},
{ctgInitGetTSMATask, ctgLaunchGetTSMATask, ctgHandleGetTSMARsp, ctgDumpTSMARes, NULL, NULL},
};
int32_t ctgMakeAsyncRes(SCtgJob* pJob) {

View File

@ -3243,7 +3243,7 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
}
tNameGetFullDbName(pName, dbFName);
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
CTG_ERR_RET(ctgAcquireDBCache(pCtg, dbFName, &dbCache));
if (!dbCache) {
ctgDebug("DB %s not in cache", dbFName);
for (int32_t i = 0; i < tbNum; ++i) {
@ -3312,6 +3312,49 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
CTG_RET(code);
}
int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaName) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
SCtgDBCache *pDbCache = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SMetaRes res = {0};
bool found = false;
STSMACache * pTsmaOut = NULL;
tNameGetFullDbName(pTsmaName, dbFName);
CTG_ERR_RET(ctgAcquireDBCache(pCtg, dbFName, &pDbCache));
if (!pDbCache) {
ctgDebug("DB %s not in cache", dbFName);
CTG_RET(code);
}
void * pIter = taosHashIterate(pDbCache->tsmaCache, NULL);
while (pIter && !found) {
SCtgTSMACache* pCtgCache = pIter;
CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock);
int32_t size = pCtgCache ? (pCtgCache->pTsmas ? pCtgCache->pTsmas->size : 0) : 0;
for (int32_t i = 0; i < size; ++i) {
STSMACache* pCache = taosArrayGetP(pCtgCache->pTsmas, i);
if (memcmp(pCache->name, pTsmaName->tname, TSDB_TABLE_NAME_LEN) == 0) {
found = true;
CTG_CACHE_NHIT_INC(CTG_CI_TBL_TSMA, 1);
code = tCloneTbTSMAInfo(pCache, &pTsmaOut);
break;
}
}
CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock);
pIter = taosHashIterate(pDbCache->tsmaCache, pIter);
}
taosHashCancelIterate(pDbCache->tsmaCache, pIter);
if (found) {
res.pRes = pTsmaOut;
taosArrayPush(pCtx->pResList, &res);
}
ctgReleaseDBCache(pCtg, pDbCache);
CTG_RET(code);
}
int32_t ctgUpdateTbTSMAEnqueue(SCatalog *pCtg, STSMACache **pTsma, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));

View File

@ -337,6 +337,7 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qDebug("Got view-meta from mnode, viewFName:%s", target);
break;
}
case TDMT_MND_GET_TSMA:
case TDMT_MND_GET_TABLE_TSMA: {
if (TSDB_CODE_SUCCESS != rspCode) {
if (TSDB_CODE_MND_SMA_NOT_EXIST != rspCode) {
@ -1480,11 +1481,10 @@ int32_t ctgGetViewInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName*
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* name, STableTSMAInfoRsp* out,
SCtgTaskReq* tReq) {
int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* name, STableTSMAInfoRsp* out,
SCtgTaskReq* tReq, int32_t reqType) {
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_TABLE_TSMA;
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
char tbFName[TSDB_TABLE_FNAME_LEN];

View File

@ -268,7 +268,7 @@ void ctgRemoveTSMARent(SCatalog *pCtg, SCtgDBCache *dbCache) {
CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock);
int32_t size = pCtgCache ? pCtgCache->pTsmas->size : 0;
for (int32_t i = 0; i < size; ++i) {
STSMACache* pCache = taosArrayGet(pCtgCache->pTsmas, i);
STSMACache* pCache = taosArrayGetP(pCtgCache->pTsmas, i);
if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSortCompare, ctgTSMAVersionSearchCompare)) {
ctgDebug("tsma removed from rent, viewId: %" PRIx64 " name: %s.%s.%s", pCache->tsmaId, pCache->dbFName, pCache->tb, pCache->name);
}

View File

@ -193,6 +193,9 @@ void ctgFreeSMetaData(SMetaData* pData) {
taosArrayDestroy(pData->pTableTsmas);
pData->pTableTsmas = NULL;
taosArrayDestroy(pData->pTsmas);
pData->pTsmas = NULL;
taosMemoryFreeClear(pData->pSvrVer);
}
@ -625,6 +628,7 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
}
break;
}
case TDMT_MND_GET_TSMA:
case TDMT_MND_GET_TABLE_TSMA: {
if (pCtx->out) {
tFreeTableTSMAInfoRsp(pCtx->out);
@ -814,6 +818,7 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void** pRes) {
*pRes = NULL; // no need to free it
break;
}
case CTG_TASK_GET_TSMA:
case CTG_TASK_GET_TB_TSMA: {
SArray* pArr = (SArray*)*pRes;
int32_t num = taosArrayGetSize(pArr);
@ -986,6 +991,7 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
taosMemoryFreeClear(pTask->taskCtx);
break;
}
case CTG_TASK_GET_TSMA:
case CTG_TASK_GET_TB_TSMA: {
SCtgTbTSMACtx* pTsmaCtx = pTask->taskCtx;
taosArrayDestroyEx(pTsmaCtx->pResList, ctgFreeTbTSMARes);
@ -2084,6 +2090,7 @@ void ctgDestroySMetaData(SMetaData* pData) {
taosArrayDestroyEx(pData->pDnodeList, ctgFreeDnodeList);
taosArrayDestroyEx(pData->pView, ctgFreeViewMeta);
taosArrayDestroyEx(pData->pTableTsmas, ctgFreeTbTSMAInfo);
taosArrayDestroyEx(pData->pTsmas, ctgFreeTbTSMAInfo);
taosMemoryFreeClear(pData->pSvrVer);
}

View File

@ -282,8 +282,8 @@ SSDataBlock* doNonSortMerge(SOperatorInfo* pOperator) {
idx = NON_SORT_NEXT_SRC(pNonSortMerge, pNonSortMerge->lastSourceIdx);
continue;
}
break;
pNonSortMerge->lastSourceIdx = idx - 1;
break;
}
if (!pBlock) {

View File

@ -7326,9 +7326,6 @@ static const char* jkTSMAOptionTsPrecision = "Precision";
static int32_t tsmaOptionToJson(const void* pObj, SJson* pJson) {
const STSMAOptions* pNode = (const STSMAOptions*)pObj;
int32_t code = nodeListToJson(pJson, jkTSMAOptionFuncs, pNode->pFuncs);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkTSMAOptionCols, pNode->pCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkTSMAOptionInterval, nodeToJson, pNode->pInterval);
}
@ -7341,9 +7338,6 @@ static int32_t tsmaOptionToJson(const void* pObj, SJson* pJson) {
static int32_t jsonToTSMAOption(const SJson* pJson, void* pObj) {
STSMAOptions* pNode = (STSMAOptions*)pObj;
int32_t code = jsonToNodeList(pJson, jkTSMAOptionFuncs, &pNode->pFuncs);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkTSMAOptionCols, &pNode->pCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkTSMAOptionInterval, &pNode->pInterval);
}

View File

@ -919,7 +919,6 @@ void nodesDestroyNode(SNode* pNode) {
}
case QUERY_NODE_TSMA_OPTIONS: {
STSMAOptions* pOptions = (STSMAOptions*)pNode;
nodesDestroyList(pOptions->pCols);
nodesDestroyList(pOptions->pFuncs);
nodesDestroyNode(pOptions->pInterval);
break;

View File

@ -277,7 +277,7 @@ SNode* createShowCompactsStmt(SAstCreateContext* pCxt, ENodeType type);
SNode* createCreateTSMAStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* tsmaName, SNode* pOptions,
SNode* pRealTable, SNode* pInterval);
SNode* createTSMAOptions(SAstCreateContext* pCxt, SNodeList* pFuncs, SNodeList* pCols);
SNode* createTSMAOptions(SAstCreateContext* pCxt, SNodeList* pFuncs);
SNode* createDefaultTSMAOptions(SAstCreateContext* pCxt);
SNode* createDropTSMAStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pRealTable);
SNode* createShowCreateTSMAStmt(SAstCreateContext* pCxt, SNode* pRealTable);

View File

@ -111,6 +111,7 @@ typedef struct SParseMetaCache {
SHashObj* pTableCfg; // key is tbFName, element is STableCfg*
SHashObj* pViews; // key is viewFName, element is SViewMeta*
SHashObj* pTableTSMAs; // key is tbFName, elements are SArray<STableTSMAInfo*>
SHashObj* pTSMAs; // key is tsmaFName, elemetns are STableTSMAInfo*
SArray* pDnodes; // element is SEpSet
bool dnodeRequired;
} SParseMetaCache;
@ -155,6 +156,7 @@ int32_t reserveTableIndexInCache(int32_t acctId, const char* pDb, const char* pT
int32_t reserveTableCfgInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t reserveDnodeRequiredInCache(SParseMetaCache* pMetaCache);
int32_t reserveTableTSMAInfoInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t reserveTSMAInfoInCache(int32_t acctId, const char* pDb, const char* pTsmaName, SParseMetaCache* pMetaCache);
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta);
int32_t getViewMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta);
int32_t buildTableMetaFromViewMeta(STableMeta** pMeta, SViewMeta* pViewMeta);
@ -171,6 +173,7 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request);
SNode* createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode* pTable, SNodeList* pHint);
int32_t getTableTsmasFromCache(SParseMetaCache* pMetaCache, const SName* pTbName, SArray** pTsmas);
int32_t getTsmaFromCache(SParseMetaCache* pMetaCache, const SName* pTsmaName, STableTSMAInfo** pTsma);
/**
* @brief return a - b with overflow check

View File

@ -582,7 +582,10 @@ db_kind_opt(A) ::= SYSTEM.
/************************************************ tsma ********************************************************/
cmd ::= CREATE TSMA not_exists_opt(B) tsma_name(C)
ON full_table_name(E) tsma_opt(D) INTERVAL NK_LP duration_literal(F) NK_RP. { pCxt->pRootNode = createCreateTSMAStmt(pCxt, B, &C, D, E, releaseRawExprNode(pCxt, F)); }
ON full_table_name(E) tsma_func_list(D)
INTERVAL NK_LP duration_literal(F) NK_RP. { pCxt->pRootNode = createCreateTSMAStmt(pCxt, B, &C, D, E, releaseRawExprNode(pCxt, F)); }
cmd ::= CREATE RECURSIVE TSMA not_exists_opt(B) tsma_name(C)
ON full_table_name(D) INTERVAL NK_LP duration_literal(E) NK_RP. { pCxt->pRootNode = createCreateTSMAStmt(pCxt, B, &C, NULL, D, releaseRawExprNode(pCxt, E)); }
cmd ::= DROP TSMA exists_opt(B) full_tsma_name(C). { pCxt->pRootNode = createDropTSMAStmt(pCxt, B, C); }
cmd ::= SHOW CREATE TSMA full_tsma_name(B). { pCxt->pRootNode = createShowCreateTSMAStmt(pCxt, B); }
cmd ::= SHOW db_name_cond_opt(B) TSMAS. { pCxt->pRootNode = createShowTSMASStmt(pCxt, B); }
@ -590,20 +593,9 @@ cmd ::= SHOW db_name_cond_opt(B) TSMAS.
full_tsma_name(A) ::= tsma_name(B). { A = createRealTableNodeForIndexName(pCxt, NULL, &B); }
full_tsma_name(A) ::= db_name(B) NK_DOT tsma_name(C). { A = createRealTableNodeForIndexName(pCxt, &B, &C); }
%type tsma_opt { SNode* }
%destructor tsma_opt { nodesDestroyNode($$); }
tsma_opt(A) ::= . { A = createDefaultTSMAOptions(pCxt); }
tsma_opt(A) ::= FUNCTION NK_LP tsma_func_list(B) NK_RP
COLUMN NK_LP col_name_list(C) NK_RP. { A = createTSMAOptions(pCxt, B, C); }
%type tsma_func_list { SNodeList* }
%destructor tsma_func_list { nodesDestroyList($$); }
tsma_func_list(A) ::= tsma_func_name(B). { A = createNodeList(pCxt, B); }
tsma_func_list(A) ::= tsma_func_list(B) NK_COMMA tsma_func_name(C). { A = addNodeToList(pCxt, B, C); }
%type tsma_func_name { SNode* }
%destructor tsma_func_name { nodesDestroyNode($$); }
tsma_func_name(A) ::= sma_func_name(B). { A = createFunctionNode(pCxt, &B, NULL); }
%type tsma_func_list { SNode* }
%destructor tsma_func_list { nodesDestroyNode($$); }
tsma_func_list(A) ::= FUNCTION NK_LP func_list(B) NK_RP. { A = createTSMAOptions(pCxt, B); }
/************************************************ create index ********************************************************/
cmd ::= CREATE SMA INDEX not_exists_opt(D)

View File

@ -2846,7 +2846,19 @@ SNode* createCreateTSMAStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken*
CHECK_OUT_OF_MEM(pStmt);
pStmt->ignoreExists = ignoreExists;
if (!pOptions) {
// recursive tsma
pStmt->pOptions = (STSMAOptions*)nodesMakeNode(QUERY_NODE_TSMA_OPTIONS);
if (!pStmt->pOptions) {
nodesDestroyNode((SNode*)pStmt);
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "Out of memory");
return NULL;
}
pStmt->pOptions->recursiveTsma = true;
} else {
pStmt->pOptions = (STSMAOptions*)pOptions;
}
pStmt->pOptions->pInterval = pInterval;
COPY_STRING_FORM_ID_TOKEN(pStmt->tsmaName, tsmaName);
@ -2858,7 +2870,7 @@ SNode* createCreateTSMAStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken*
return (SNode*)pStmt;
}
SNode* createTSMAOptions(SAstCreateContext* pCxt, SNodeList* pFuncs, SNodeList* pCols) {
SNode* createTSMAOptions(SAstCreateContext* pCxt, SNodeList* pFuncs) {
CHECK_PARSER_STATUS(pCxt);
/*
SNode *pNode1, *pNode2;
@ -2888,8 +2900,10 @@ SNode* createTSMAOptions(SAstCreateContext* pCxt, SNodeList* pFuncs, SNodeList*
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "Out of memory");
return NULL;
}
// TODO check duplicate funcs somewhere
// TODO check non supported funcs somewhere
pOptions->pFuncs = pFuncs;
pOptions->pCols = pCols;
//pOptions->pCols = pCols;
//nodesDestroyList(pFuncs);
//nodesDestroyList(pCols);

View File

@ -766,10 +766,24 @@ static int32_t collectMetaKeyFromDropViewStmt(SCollectMetaKeyCxt* pCxt, SDropVie
static int32_t collectMetaKeyFromCreateTSMAStmt(SCollectMetaKeyCxt* pCxt, SCreateTSMAStmt* pStmt) {
int32_t code;
if (pStmt->pOptions->recursiveTsma) {
// if creating recursive tsma, the tablename is tsmaName
code = reserveTSMAInfoInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
char dstTbName[TSDB_TABLE_NAME_LEN] = {0};
// TODO check len
sprintf(dstTbName, "%s"TSMA_RES_STB_POSTFIX, pStmt->tableName);
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, dstTbName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, dstTbName, pCxt->pMetaCache);
}
}
} else {
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
}

View File

@ -201,6 +201,7 @@ static SKeyword keywordTable[] = {
{"RATIO", TK_RATIO},
{"PAUSE", TK_PAUSE},
{"READ", TK_READ},
{"RECURSIVE", TK_RECURSIVE},
{"REDISTRIBUTE", TK_REDISTRIBUTE},
{"RENAME", TK_RENAME},
{"REPLACE", TK_REPLACE},

View File

@ -719,7 +719,27 @@ static int32_t getTableTsmas(STranslateContext* pCxt, const SName* pName, SArray
.mgmtEps = pParCxt->mgmtEpSet};
code = catalogGetTableTsmas(pParCxt->pCatalog, &conn, pName, ppTsmas);
}
if (code) parserError("0x%" PRIx64 " getDnodeList error, code:%s", pCxt->pParseCxt->requestId, tstrerror(code));
if (code)
parserError("0x%" PRIx64 " get table tsma for : %s.%s error, code:%s", pCxt->pParseCxt->requestId, pName->dbname,
pName->tname, tstrerror(code));
return code;
}
static int32_t getTsma(STranslateContext* pCxt, const SName* pName, STableTSMAInfo** pTsma) {
int32_t code = 0;
SParseContext* pParCxt = pCxt->pParseCxt;
if (pParCxt->async) {
code = getTsmaFromCache(pCxt->pMetaCache, pName, pTsma);
} else {
SRequestConnInfo conn = {.pTrans = pParCxt->pTransporter,
.requestId = pParCxt->requestId,
.requestObjRefId = pParCxt->requestRid,
.mgmtEps = pParCxt->mgmtEpSet};
code = catalogGetTsma(pParCxt->pCatalog, &conn, pName, pTsma);
}
if (code)
parserError("0x%" PRIx64 " get tsma for: %s.%s error, code:%s", pCxt->pParseCxt->requestId, pName->dbname,
pName->tname, tstrerror(code));
return code;
}
@ -10414,13 +10434,77 @@ static bool sortColWithColId(SNode* pNode1, SNode* pNode2) {
return pCol1->colId < pCol2->colId;
}
static int32_t buildTSMAAstMakeConcatFuncNode(SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, const SNode* pTbNameFunc,
SFunctionNode** pConcatFuncOut) {
int32_t code = 0;
SFunctionNode* pSubstrFunc = NULL;
SNode* pRes = NULL;
SValueNode* pTsmaNameHashVNode = NULL;
SFunctionNode* pConcatFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (!pConcatFunc) code = TSDB_CODE_OUT_OF_MEMORY;
if (code == TSDB_CODE_SUCCESS) {
snprintf(pConcatFunc->functionName, TSDB_FUNC_NAME_LEN, "concat");
code = nodesListMakeStrictAppend(&pConcatFunc->pParameterList, nodesMakeNode(QUERY_NODE_VALUE));
}
if (TSDB_CODE_SUCCESS == code) {
pTsmaNameHashVNode = (SValueNode*)nodesListGetNode(pConcatFunc->pParameterList, 0);
pTsmaNameHashVNode->literal = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1);
if (!pTsmaNameHashVNode->literal) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
sprintf(pTsmaNameHashVNode->literal, "%s", pReq->name);
int32_t len = taosCreateMD5Hash(pTsmaNameHashVNode->literal, strlen(pTsmaNameHashVNode->literal));
pTsmaNameHashVNode->literal[len] = '_';
pTsmaNameHashVNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
pTsmaNameHashVNode->node.resType.bytes = strlen(pTsmaNameHashVNode->literal);
}
if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->recursiveTsma) {
pSubstrFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (!pSubstrFunc) code = TSDB_CODE_OUT_OF_MEMORY;
if (TSDB_CODE_SUCCESS == code) {
snprintf(pSubstrFunc->functionName, TSDB_FUNC_NAME_LEN, "substr");
code = nodesListMakeStrictAppend(&pSubstrFunc->pParameterList, nodesCloneNode(pTbNameFunc));
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pSubstrFunc->pParameterList, nodesMakeNode(QUERY_NODE_VALUE));
if (TSDB_CODE_SUCCESS == code) {
SValueNode* pV = (SValueNode*)pSubstrFunc->pParameterList->pTail->pNode;
pV->literal = strdup("34");
if (!pV->literal) code = TSDB_CODE_OUT_OF_MEMORY;
pV->isDuration = false;
pV->translate = false;
pV->node.resType.type = TSDB_DATA_TYPE_INT;
pV->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
}
}
}
if (TSDB_CODE_SUCCESS == code) {
if (pSubstrFunc) {
code = nodesListAppend(pConcatFunc->pParameterList, (SNode*)pSubstrFunc);
} else {
code = nodesListStrictAppend(pConcatFunc->pParameterList, nodesCloneNode(pTbNameFunc));
}
}
if (TSDB_CODE_SUCCESS == code) {
*pConcatFuncOut = pConcatFunc;
} else {
nodesDestroyNode((SNode*)pSubstrFunc);
nodesDestroyNode((SNode*)pConcatFunc);
}
return code;
}
static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq,
STableMeta* pTableMeta) {
const char* tbName, int32_t numOfTags, const SSchema* pTags) {
int32_t code = TSDB_CODE_SUCCESS;
SSampleAstInfo info = {0};
info.createSmaIndex = true;
info.pDbName = pStmt->dbName;
info.pTableName = pStmt->tableName;
info.pTableName = tbName;
info.pFuncs = nodesCloneList(pStmt->pOptions->pFuncs);
info.pInterval = nodesCloneNode(pStmt->pOptions->pInterval);
if (!info.pFuncs || !info.pInterval) code = TSDB_CODE_OUT_OF_MEMORY;
@ -10435,40 +10519,17 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (code == TSDB_CODE_SUCCESS) {
SFunctionNode* pConcatFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
SValueNode* pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (!pConcatFunc || !pValue) code = TSDB_CODE_OUT_OF_MEMORY;
SFunctionNode* pConcatFunc = NULL;
code = buildTSMAAstMakeConcatFuncNode(pStmt, pReq, (const SNode*)pTbnameFunc, &pConcatFunc);
if (code == TSDB_CODE_SUCCESS) {
pValue->literal = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
if (!pValue->literal) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (code == TSDB_CODE_SUCCESS) {
strcpy(pValue->literal, pReq->name);
int32_t len = taosCreateMD5Hash(pValue->literal, strlen(pValue->literal));
pValue->literal[len] = '_';
snprintf(pConcatFunc->functionName, TSDB_FUNC_NAME_LEN, "concat");
pValue->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
pValue->node.resType.bytes = strlen(pValue->literal);
pValue->isDuration = false;
pValue->isNull = false;
code = nodesListMakeAppend(&pConcatFunc->pParameterList, (SNode*)pValue);
}
if (code == TSDB_CODE_SUCCESS) {
code = nodesListStrictAppend(pConcatFunc->pParameterList, nodesCloneNode((SNode*)pTbnameFunc));
}
if (code) {
nodesDestroyNode((SNode*)pConcatFunc);
nodesDestroyNode((SNode*)pValue);
} else {
info.pSubTable = (SNode*)pConcatFunc;
}
}
}
if (TSDB_CODE_SUCCESS == code) {
// append partition by tags
int32_t numOfCols = pTableMeta->tableInfo.numOfColumns, numOfTags = pTableMeta->tableInfo.numOfTags;
for (int32_t idx = numOfCols; idx < numOfCols + numOfTags; ++idx) {
SNode* pTagCol = createColumnNodeWithName(pTableMeta->schema[idx].name);
for (int32_t idx = 0; idx < numOfTags; ++idx) {
SNode* pTagCol = createColumnNodeWithName(pTags[idx].name);
if (!pTagCol) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
@ -10478,7 +10539,7 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC
}
}
if (code == TSDB_CODE_SUCCESS)
if (code == TSDB_CODE_SUCCESS && !pStmt->pOptions->recursiveTsma)
code = fmCreateStateFuncs(info.pFuncs);
if (code == TSDB_CODE_SUCCESS) {
@ -10488,88 +10549,50 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC
return code;
}
static char* defaultTSMAFuncs[4] = {"max", "min", "sum", "count"};
static int32_t createColumnBySchema(const SSchema* pSchema, SColumnNode** ppCol) {
*ppCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
int32_t code = 0;
if (!*ppCol) return TSDB_CODE_OUT_OF_MEMORY;
static int32_t
translateTSMAFuncs(STranslateContext * pCxt, SCreateTSMAStmt* pStmt, STableMeta* pTableMeta) {
(*ppCol)->colId = pSchema->colId;
strcpy((*ppCol)->colName, pSchema->name);
return TSDB_CODE_SUCCESS;
}
static int32_t rewriteTSMAFuncs(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, int32_t columnNum,
const SSchema* pCols) {
int32_t code = TSDB_CODE_SUCCESS;
if (!pStmt->pOptions->pFuncs) {
// add default funcs for TSMA
for (int32_t i = 0; i < sizeof(defaultTSMAFuncs) / sizeof(char*); ++i) {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (!pFunc || TSDB_CODE_SUCCESS != nodesListMakeAppend(&pStmt->pOptions->pFuncs, (SNode*)pFunc)) {
nodesDestroyNode((SNode*)pFunc);
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
strcpy(pFunc->functionName, defaultTSMAFuncs[i]);
}
// add all numeric cols
SNode* pNode;
SFunctionNode* pFunc = NULL;
SColumnNode* pCol = NULL;
if (pStmt->pOptions->recursiveTsma) {
// recursive tsma, create func list from base tsma
code = fmCreateStateMergeFuncs(pStmt->pOptions->pFuncs);
if (TSDB_CODE_SUCCESS == code) {
for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
const SSchema* schema = &pTableMeta->schema[i];
if (!IS_NUMERIC_TYPE(schema->type)) continue;
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (!pCol || TSDB_CODE_SUCCESS != nodesListMakeAppend(&pStmt->pOptions->pCols, (SNode*)pCol)) {
nodesDestroyNode((SNode*)pCol);
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
strcpy(pCol->colName, schema->name);
pCol->colId = schema->colId;
int32_t i = 0;
FOREACH(pNode, pStmt->pOptions->pFuncs) {
// rewrite all func parameters with tsma dest tb cols
pFunc = (SFunctionNode*)pNode;
const SSchema* pSchema = pCols + i;
code = createColumnBySchema(pSchema, &pCol);
if (code) break;
nodesListErase(pFunc->pParameterList, pFunc->pParameterList->pHead);
nodesListPushFront(pFunc->pParameterList, (SNode*)pCol);
//TODO what if exceeds the max size
snprintf(pFunc->node.userAlias, TSDB_COL_NAME_LEN, "%s", pSchema->name);
++i;
}
}
} else {
SNode* pNode;
FOREACH(pNode, pStmt->pOptions->pCols) {
SColumnNode* pCol = (SColumnNode*)pNode;
for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
const SSchema* pSchema = &pTableMeta->schema[i];
if (strcmp(pSchema->name, pCol->colName) == 0) {
pCol->colId = pSchema->colId;
break;
}
}
}
}
nodesSortList(&pStmt->pOptions->pCols, sortColWithColId);
SNode* pNode;
FOREACH(pNode, pStmt->pOptions->pFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pNode;
int32_t funcId = fmGetFuncId(pFunc->functionName);
if (funcId < 0) {
return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
pFunc = (SFunctionNode*)pNode;
// TODO test func params with exprs
pCol = (SColumnNode*)pFunc->pParameterList->pHead->pNode;
snprintf(pFunc->node.userAlias, TSDB_COL_NAME_LEN, "%s(%s)", pFunc->functionName, pCol->colName);
}
pFunc->funcId = funcId;
}
nodesSortList(&pStmt->pOptions->pFuncs, sortFuncWithFuncId);
if (TSDB_CODE_SUCCESS == code) {
// assemble funcs with columns
SNode *pNode1, *pNode2;
SNodeList* pTSMAFuncs = nodesMakeList();
if (!pTSMAFuncs) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
FOREACH(pNode1, pStmt->pOptions->pFuncs) {
FOREACH(pNode2, pStmt->pOptions->pCols) {
SFunctionNode* pFunc = (SFunctionNode*)nodesCloneNode(pNode1);
SColumnNode* pCol = (SColumnNode*)nodesCloneNode(pNode2);
if (!pFunc || !pCol ||
(TSDB_CODE_SUCCESS != nodesListMakeAppend(&pFunc->pParameterList, (SNode*)pCol) || (pCol = NULL)) ||
TSDB_CODE_SUCCESS != nodesListAppend(pTSMAFuncs, (SNode*)pFunc)) {
nodesDestroyNode((SNode*)pFunc);
nodesDestroyNode((SNode*)pCol);
nodesDestroyList(pTSMAFuncs);
return TSDB_CODE_OUT_OF_MEMORY;
}
// TODO what if exceeds the max size
snprintf(pFunc->node.userAlias, TSDB_COL_NAME_LEN, "%s(%s)", pFunc->functionName,
((SColumnNode*)pNode2)->colName);
}
}
nodesDestroyList(pStmt->pOptions->pFuncs);
pStmt->pOptions->pFuncs = pTSMAFuncs;
nodesSortList(&pStmt->pOptions->pFuncs, sortFuncWithFuncId);
}
return code;
}
@ -10586,20 +10609,54 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
int32_t code = TSDB_CODE_SUCCESS;
STableMeta* pTableMeta = NULL;
//TODO 在使用该tableName时, 如果确定其其实是tsma name, 那么避免将此作为tbname进行catalog 获取.
STableTSMAInfo *pRecursiveTsma = NULL;
int32_t numOfCols = 0, numOfTags = 0;
SSchema* pCols = NULL, *pTags = NULL;
if (pStmt->pOptions->recursiveTsma) {
code = getTsma(pCxt, &name, &pRecursiveTsma);
if (code == TSDB_CODE_SUCCESS) {
SNode* pNode;
if (TSDB_CODE_SUCCESS != nodesStringToNode(pRecursiveTsma->ast, &pNode)) {
return TSDB_CODE_TSMA_INVALID_STAT;
}
SSelectStmt* pSelect = (SSelectStmt*)pNode;
FOREACH(pNode, pSelect->pProjectionList) {
SFunctionNode* pFuncNode = (SFunctionNode*)pNode;
if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
nodesListMakeStrictAppend(&pStmt->pOptions->pFuncs, nodesCloneNode(pNode));
}
nodesDestroyNode((SNode*)pSelect);
}
memset(&name, 0, sizeof(SName));
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pRecursiveTsma->targetTb, &name), pReq->stb);
numOfCols = pRecursiveTsma->pUsedCols->size; // TODO merge pUsedCols and pTags with one SSchema array
numOfTags = pRecursiveTsma->pTags->size;
pCols = pRecursiveTsma->pUsedCols->pData;
pTags = pRecursiveTsma->pTags->pData;
} else {
code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
numOfCols = pTableMeta->tableInfo.numOfColumns;
numOfTags = pTableMeta->tableInfo.numOfTags;
pCols = pTableMeta->schema;
pTags = pTableMeta->schema + numOfCols;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = getSmaIndexDstVgId(pCxt, pStmt->dbName, pStmt->tableName, &pReq->dstVgId);
//code = getSmaIndexDstVgId(pCxt, pStmt->dbName, pStmt->tableName, &pReq->dstVgId);
}
if (TSDB_CODE_SUCCESS == code) {
code = getSmaIndexSql(pCxt, &pReq->sql, &pReq->sqlLen);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateTSMAFuncs(pCxt, pStmt, pTableMeta);
code = rewriteTSMAFuncs(pCxt, pStmt, numOfCols, pCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildTSMAAst(pCxt, pStmt, pReq, pTableMeta);
code = buildTSMAAst(pCxt, pStmt, pReq, pStmt->pOptions->recursiveTsma ? pRecursiveTsma->targetTb : pStmt->tableName,
numOfTags, pTags);
}
/*
if (TSDB_CODE_SUCCESS == code) {

View File

@ -16,6 +16,8 @@
#include "parUtil.h"
#include "cJSON.h"
#include "querynodes.h"
#include "tarray.h"
#include "tlog.h"
#define USER_AUTH_KEY_MAX_LEN TSDB_USER_LEN + TSDB_TABLE_FNAME_LEN + 2
@ -726,6 +728,9 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
if (TSDB_CODE_SUCCESS == code) {
code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pTableTSMAs);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildTableReqFromDb(pMetaCache->pTSMAs, &pCatalogReq->pTSMAs);
}
#ifdef TD_ENTERPRISE
if (TSDB_CODE_SUCCESS == code) {
code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pView);
@ -870,6 +875,9 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet
if (TSDB_CODE_SUCCESS == code) {
code = putDbTableDataToCache(pCatalogReq->pTableTSMAs, pMetaData->pTableTsmas, &pMetaCache->pTableTSMAs);
}
if (TSDB_CODE_SUCCESS == code) {
code = putDbTableDataToCache(pCatalogReq->pTSMAs, pMetaData->pTsmas, &pMetaCache->pTSMAs);
}
#ifdef TD_ENTERPRISE
if (TSDB_CODE_SUCCESS == code) {
code = putDbTableDataToCache(pCatalogReq->pView, pMetaData->pView, &pMetaCache->pViews);
@ -1144,6 +1152,10 @@ int32_t reserveTableTSMAInfoInCache(int32_t acctId, const char *pDb, const char
return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableTSMAs);
}
int32_t reserveTSMAInfoInCache(int32_t acctId, const char* pDb, const char* pTsmaName, SParseMetaCache* pMetaCache) {
return reserveTableReqInDbCache(acctId, pDb, pTsmaName, &pMetaCache->pTSMAs);
}
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
@ -1161,8 +1173,7 @@ int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName,
int32_t getTableTsmasFromCache(SParseMetaCache* pMetaCache, const SName* pTbName, SArray** pTsmas) {
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTbName, tbFName);
STableTSMAInfoRsp *pTsmasRsp = NULL;
STableTSMAInfo* pTsma = NULL;
STableTSMAInfoRsp* pTsmasRsp = NULL;
int32_t code = getMetaDataFromHash(tbFName, strlen(tbFName), pMetaCache->pTableTSMAs, (void**)&pTsmasRsp);
if (TSDB_CODE_SUCCESS == code && pTsmasRsp) {
*pTsmas = pTsmasRsp->pTsmas;
@ -1170,6 +1181,18 @@ int32_t getTableTsmasFromCache(SParseMetaCache* pMetaCache, const SName* pTbName
return TSDB_CODE_SUCCESS;
}
int32_t getTsmaFromCache(SParseMetaCache* pMetaCache, const SName* pTsmaName, STableTSMAInfo** pTsma) {
char tsmaFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTsmaName, tsmaFName);
STableTSMAInfoRsp* pTsmaRsp = NULL;
int32_t code = getMetaDataFromHash(tsmaFName, strlen(tsmaFName), pMetaCache->pTSMAs, (void**)&pTsmaRsp);
if (TSDB_CODE_SUCCESS == code && pTsmaRsp) {
ASSERT(pTsmaRsp->pTsmas->size == 1);
*pTsma = taosArrayGetP(pTsmaRsp->pTsmas, 0);
}
return code;
}
STableCfg* tableCfgDup(STableCfg* pCfg) {
STableCfg* pNew = taosMemoryMalloc(sizeof(*pNew));
@ -1242,10 +1265,12 @@ void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
destoryParseTablesMetaReqHash(pMetaCache->pTableMeta);
destoryParseTablesMetaReqHash(pMetaCache->pTableVgroup);
destoryParseTablesMetaReqHash(pMetaCache->pViews);
destoryParseTablesMetaReqHash(pMetaCache->pTSMAs);
} else {
taosHashCleanup(pMetaCache->pTableMeta);
taosHashCleanup(pMetaCache->pTableVgroup);
taosHashCleanup(pMetaCache->pViews);
taosHashCleanup(pMetaCache->pTSMAs);
}
taosHashCleanup(pMetaCache->pDbVgroup);
taosHashCleanup(pMetaCache->pDbCfg);

View File

@ -271,6 +271,7 @@ void destoryCatalogReq(SCatalogReq *pCatalogReq) {
taosArrayDestroy(pCatalogReq->pView);
#endif
taosArrayDestroy(pCatalogReq->pTableTSMAs);
taosArrayDestroy(pCatalogReq->pTSMAs);
} else {
taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq);
taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq);
@ -278,6 +279,7 @@ void destoryCatalogReq(SCatalogReq *pCatalogReq) {
taosArrayDestroyEx(pCatalogReq->pView, destoryTablesReq);
#endif
taosArrayDestroyEx(pCatalogReq->pTableTSMAs, destoryTablesReq);
taosArrayDestroyEx(pCatalogReq->pTSMAs, destoryTablesReq);
}
taosArrayDestroy(pCatalogReq->pUdf);
taosArrayDestroy(pCatalogReq->pIndex);

View File

@ -1712,6 +1712,7 @@ static const YYCODETYPE yyFallback[] = {
0, /* SYSTEM => nothing */
0, /* TSMA => nothing */
0, /* INTERVAL => nothing */
0, /* RECURSIVE => nothing */
0, /* TSMAS => nothing */
0, /* FUNCTION => nothing */
0, /* INDEX => nothing */

View File

@ -6168,7 +6168,7 @@ static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbN
// TODO test child tbname too long
// if with tsma, we replace func tbname with substr(tbname, 34)
pRewrittenFunc->funcId = fmGetFuncId("substr");
snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "substr(tbname, 34)");
snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "substr");
pValue->node.resType.type = TSDB_DATA_TYPE_INT;
pValue->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
pValue->literal = taosMemoryCalloc(1, 16);
@ -6184,7 +6184,7 @@ static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbN
} else if (code == TSDB_CODE_SUCCESS) {
// if no tsma, we replace func tbname with concat('', tbname)
pRewrittenFunc->funcId = fmGetFuncId("concat");
snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "concat('', tbname)");
snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "concat");
pValue->node.resType = ((SExprNode*)(*pTbNameNode))->resType;
pValue->literal = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1);
@ -6387,6 +6387,7 @@ static int32_t tsmaOptRevisePlan2(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent,
pColNode = (SColumnNode*)pScanListCell->pNode;
pScanListCell = pScanListCell->pNext;
pColNode->node.resType = pPartial->node.resType;
// currently we assume that the first parameter must be the scan column
nodesListErase(pMerge->pParameterList, pMerge->pParameterList->pHead);
// TODO STRICT
nodesListPushFront(pMerge->pParameterList, nodesCloneNode((SNode*)pColNode));

View File

@ -322,6 +322,25 @@ int32_t queryBuildGetTableTSMAMsg(void *input, char **msg, int32_t msgSize, int3
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetTSMAMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen,
void *(*mallcFp)(int64_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
STableTSMAInfoReq req = {0};
req.fetchingTsma = true;
strncpy(req.name, input, sizeof(req.name) - 1);
int32_t bufLen = tSerializeTableTSMAInfoReq(NULL, 0, &req);
void * pBuf = (*mallcFp)(bufLen);
tSerializeTableTSMAInfoReq(pBuf, bufLen, &req);
*msg = pBuf;
*msgLen = bufLen;
return TSDB_CODE_SUCCESS;
}
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
SUseDbOutput *pOut = output;
SUseDbRsp usedbRsp = {0};
@ -731,6 +750,7 @@ void initQueryModuleMsgHandle() {
queryBuildMsg[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryBuildGetSerVerMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_VIEW_META)] = queryBuildGetViewMetaMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_TSMA)] = queryBuildGetTableTSMAMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TSMA)] = queryBuildGetTSMAMsg;
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
@ -747,6 +767,7 @@ void initQueryModuleMsgHandle() {
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryProcessGetSerVerRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_VIEW_META)] = queryProcessGetViewMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_TSMA)] = queryProcessGetTbTSMARsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TSMA)] = queryProcessGetTbTSMARsp;
}
#pragma GCC diagnostic pop

View File

@ -1,3 +1,4 @@
from os import name
from random import randrange
import taos
import time
@ -23,6 +24,7 @@ class TSMA:
class UsedTsma:
TS_MIN = '-9223372036854775808'
TS_MAX = '9223372036854775806'
TSMA_RES_STB_POSTFIX = '_tsma_res_stb_'
def __init__(self) -> None:
self.name = '' ## tsma name or table name
@ -48,6 +50,9 @@ class UsedTsma:
def __repr__(self) -> str:
return self.__str__()
def setIsTsma(self):
self.is_tsma_ = self.name.endswith(self.TSMA_RES_STB_POSTFIX)
class TSMAQueryContext:
def __init__(self) -> None:
self.sql = ''
@ -105,7 +110,7 @@ class TSMAQueryContextBuilder:
def should_query_with_tsma(self, tsma_name: str, ts_begin: str, ts_end: str) -> 'TSMAQueryContextBuilder':
used_tsma: UsedTsma = UsedTsma()
used_tsma.name = tsma_name + '_tsma_res_stb'
used_tsma.name = tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX
used_tsma.time_range_start = self.to_timestamp(ts_begin)
used_tsma.time_range_end = self.to_timestamp(ts_end)
used_tsma.is_tsma_ = True
@ -139,8 +144,7 @@ class TSMATestContext:
if idx >= 0:
words = row[idx:].split(' ')
used_tsma.name = words[3]
if used_tsma.name.endswith('tsma_res_stb'):
used_tsma.is_tsma_ = True
used_tsma.setIsTsma()
else:
idx = row.find('Time Range:')
if idx >= 0:
@ -170,7 +174,7 @@ class TSMATestContext:
tdLog.exit('check explain failed for sql: %s \nexpect: %s \nactual: %s' % (sql, str(expect), str(query_ctx)))
def check_result(self, sql: str):
#tdSql.execute("alter local 'querySmaOptimize' '1'")
tdSql.execute("alter local 'querySmaOptimize' '1'")
tsma_res = tdSql.getResult(sql)
tdSql.execute("alter local 'querySmaOptimize' '0'")
@ -185,6 +189,7 @@ class TSMATestContext:
for row_no_tsma, row_tsma in zip(no_tsma_res, tsma_res):
if row_no_tsma != row_tsma:
tdLog.exit("comparing tsma res for: %s got different row data: no tsma row: %s, tsma row: %s" % (sql, str(row_no_tsma), str(row_tsma)))
tdLog.info('result check succeed for sql: %s. \n tsma-res: %s. \nno_tsma-res: %s' % (sql, str(tsma_res), str(no_tsma_res)))
def check_sql(self, sql: str, expect: TSMAQueryContext):
self.check_explain(sql, expect=expect)
@ -298,13 +303,18 @@ class TDTestCase:
startTs=paraDict["startTs"],tsStep=paraDict["tsStep"])
return
def create_tsma(self, tsma_name: str, db: str, tb: str, func_list: list, col_list: list, interval: str):
def create_tsma(self, tsma_name: str, db: str, tb: str, func_list: list, interval: str):
tdSql.execute('use %s' % db)
sql = "create tsma %s on %s.%s function(%s) column(%s) interval(%s)" % (tsma_name, db, tb, ','.join(func_list), ','.join(col_list), interval)
sql = "CREATE TSMA %s ON %s.%s FUNCTION(%s) INTERVAL(%s)" % (tsma_name, db, tb, ','.join(func_list), interval)
tdSql.execute(sql, queryTimes=1)
def create_recursive_tsma(self, base_tsma_name: str, new_tsma_name: str, db: str, interval: str):
tdSql.execute('use %s' % db, queryTimes=1)
sql = 'CREATE RECURSIVE TSMA %s ON %s.%s INTERVAL(%s)' % (new_tsma_name, db, base_tsma_name, interval)
tdSql.execute(sql, queryTimes=1)
def drop_tsma(self, tsma_name: str, db: str):
sql = 'drop tsma %s.%s' % (db, tsma_name)
sql = 'DROP TSMA %s.%s' % (db, tsma_name)
tdSql.execute(sql, queryTimes=1)
def check_explain_res_has_row(self, plan_str_expect: str, explain_output):
@ -323,10 +333,13 @@ class TDTestCase:
def test_query_with_tsma(self):
self.init_data()
self.create_tsma('tsma1', 'test', 'meters', ['avg'], ['c1', 'c2'], '5m')
self.create_tsma('tsma2', 'test', 'meters', ['avg'], ['c1', 'c2'], '30m')
self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m')
self.create_recursive_tsma('tsma1', 'tsma3', 'test', '20m')
self.create_recursive_tsma('tsma2', 'tsma4', 'test', '1h')
## why need 5s, calculation not finished yet.
time.sleep(5)
time.sleep(9999999)
#time.sleep(9999999)
self.test_query_with_tsma_interval()
self.test_query_with_tsma_agg()