create tsma request

This commit is contained in:
wangjiaming0909 2023-11-24 09:47:35 +08:00
parent 5589cf0309
commit 8e2977d083
7 changed files with 297 additions and 80 deletions

View File

@ -312,6 +312,8 @@
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_CREATE, "stream-create", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_STREAM_MSG)
TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8

View File

@ -615,10 +615,11 @@ typedef struct SShowCreateTSMAStmt {
}SShowCreateTSMAStmt;
typedef struct SDropTSMAStmt {
ENodeType type;
bool ignoreNotExists;
char dbName[TSDB_DB_NAME_LEN];
char tsmaName[TSDB_INDEX_NAME_LEN];
ENodeType type;
bool ignoreNotExists;
char dbName[TSDB_DB_NAME_LEN];
char tsmaName[TSDB_INDEX_NAME_LEN];
SMDropSmaReq* pReq;
} SDropTSMAStmt;
#ifdef __cplusplus

View File

@ -225,6 +225,10 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_DROP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -55,6 +55,20 @@ static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter);
typedef struct SCreateTSMACxt {
SMnode * pMnode;
const SRpcMsg * pRpcReq;
const SMCreateSmaReq *pCreateSmaReq;
const SDbObj * pDb;
const SStbObj * pSrcStb;
// TODO normal table
SSmaObj * pSma;
SCMCreateStreamReq *pCreateStreamReq;
SMDropStreamReq * pDropStreamReq;
const char * streamName;
const char * targetStbFullName;
} SCreateTSMACxt;
int32_t mndInitSma(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_SMA,
@ -382,6 +396,14 @@ static int32_t mndSetCreateSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *
return 0;
}
static int32_t mndSetCreateSmaUndoLogs(SMnode* pMnode, STrans* pTrans, SSmaObj* pSma) {
SSdbRaw * pUndoRaw = mndSmaActionEncode(pSma);
if (!pUndoRaw) return -1;
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
static int32_t mndSetCreateSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
if (pCommitRaw == NULL) return -1;
@ -694,9 +716,7 @@ static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) {
static void mndGetStreamNameFromSmaName(char *streamName, char *smaName) {
SName n;
tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
streamName[0] = '1';
streamName[1] = '.';
strcpy(streamName + 2, tNameGetTableName(&n));
sprintf(streamName, "%d.%s", n.acctId, n.tname);
}
static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
@ -1391,31 +1411,116 @@ static void initStreamObj(SStreamObj *pStream, const char *streamName, const SMC
pStream->version = 1;
pStream->sql = taosStrdup(pCreateReq->sql);
pStream->smaId = pSma->uid;
//pStream->conf.watermark = 0;
//pStream->deleteMark = 0;
pStream->conf.watermark = 0;
pStream->deleteMark = 0;
pStream->conf.fillHistory = STREAM_FILL_HISTORY_ON;
pStream->conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
pStream->conf.triggerParam = 10000;
pStream->ast = taosStrdup(pSma->ast);
}
static int32_t mndCreateTSMATxnPrepare(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SMCreateSmaReq *pCreate,
SStreamObj *pStream, SSmaObj *pSma) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-tsma");
if (!pTrans) return TSDB_CODE_OUT_OF_MEMORY;
mndTransSetDbName(pTrans, pDb->name, NULL);
if ((mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER;
static void mndCreateTSMABuildStreamOpReq(SCreateTSMACxt *pCxt) {
tstrncpy(pCxt->pCreateStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
tstrncpy(pCxt->pCreateStreamReq->sourceDB, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
tstrncpy(pCxt->pCreateStreamReq->targetStbFullName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
pCxt->pCreateStreamReq->igExists = false;
pCxt->pCreateStreamReq->triggerType = STREAM_TRIGGER_MAX_DELAY;
pCxt->pCreateStreamReq->igExpired = false;
pCxt->pCreateStreamReq->fillHistory = STREAM_FILL_HISTORY_ON;
pCxt->pCreateStreamReq->maxDelay = 10000;
pCxt->pCreateStreamReq->watermark = 0;
pCxt->pCreateStreamReq->numOfTags = pCxt->pSrcStb->numOfTags;
pCxt->pCreateStreamReq->checkpointFreq = 0;
pCxt->pCreateStreamReq->createStb = 1;
pCxt->pCreateStreamReq->targetStbUid = 0;
pCxt->pCreateStreamReq->fillNullCols = NULL;
pCxt->pCreateStreamReq->igUpdate = 0;
// TODO what's this tiemstamp?
pCxt->pCreateStreamReq->lastTs = 1685959190000;
pCxt->pCreateStreamReq->ast = strdup(pCxt->pCreateSmaReq->ast);
pCxt->pCreateStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql);
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCreate->name, pStream->name);
// construct tags
pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pSrcStb->numOfTags, sizeof(SField));
for (int32_t idx = 0; idx < pCxt->pSrcStb->numOfTags; ++idx) {
SField f = {0};
SSchema *pSchema = &pCxt->pSrcStb->pTags[idx];
f.bytes = pSchema->bytes;
f.type = pSchema->type;
f.flags = pSchema->flags;
tstrncpy(f.name, pSchema->name, TSDB_COL_NAME_LEN);
taosArrayPush(pCxt->pCreateStreamReq->pTags, &f);
}
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
tstrncpy(pCxt->pDropStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
pCxt->pDropStreamReq->igNotExists = false;
// TODO fill sql
pCxt->pDropStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql);
pCxt->pDropStreamReq->sqlLen = strlen(pCxt->pDropStreamReq->sql);
}
static int32_t mndCreateTSMASetCreateStreamRedoAction(SMnode* pMnode) {
return TSDB_CODE_SUCCESS;
}
static int32_t mndCreateTSMASetCreateStreamUndoAction(SMnode* pMnode) {
return TSDB_CODE_SUCCESS;
}
static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
int32_t code = -1;
STransAction redoAction = {0};
STransAction undoAction = {0};
// TODO trans conflicting setting
STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pCxt->pRpcReq, "create-tsma");
if (!pTrans) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
if ((mndTransCheckConflict(pCxt->pMnode, pTrans)) != 0) goto _OVER;
mndTransSetParallel(pTrans);
mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCxt->pCreateSmaReq->name,
pCxt->pCreateStreamReq->name);
mndGetMnodeEpSet(pCxt->pMnode, &redoAction.epSet);
redoAction.acceptableCode = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
redoAction.msgType = TDMT_STREAM_CREATE;
redoAction.contLen = tSerializeSCMCreateStreamReq(0, 0, pCxt->pCreateStreamReq);
redoAction.pCont = taosMemoryCalloc(1, redoAction.contLen);
if (!redoAction.pCont) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
if (redoAction.contLen != tSerializeSCMCreateStreamReq(redoAction.pCont, redoAction.contLen, pCxt->pCreateStreamReq)) {
mError("sma: %s, failed to create due to create stream req encode failure", pCxt->pCreateSmaReq->name);
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
undoAction.epSet = redoAction.epSet;
undoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
undoAction.actionType = TDMT_STREAM_DROP;
undoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
undoAction.pCont = taosMemoryCalloc(1, undoAction.contLen);
if (!undoAction.pCont) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
if (undoAction.contLen != tSerializeSMDropStreamReq(undoAction.pCont, undoAction.contLen, pCxt->pDropStreamReq)) {
mError("sma: %s, failed to create due to drop stream req encode failure", pCxt->pCreateSmaReq->name);
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
if (mndSetCreateSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
if (mndSetCreateSmaUndoLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
if (mndTransAppendRedoAction(pTrans, &redoAction) != 0) goto _OVER;
if (mndTransAppendUndoAction(pTrans, &undoAction) != 0) goto _OVER;
if (mndTransPrepare(pCxt->pMnode, pTrans) != 0) goto _OVER;
if (mndScheduleStream(pMnode, pStream, 1685959190000) != 0) goto _OVER;
if (mndPersistStream(pMnode, pTrans, pStream) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = TSDB_CODE_SUCCESS;
_OVER:
@ -1423,60 +1528,29 @@ _OVER:
return code;
}
static int32_t mndCreateTSMA(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreateReq, SDbObj *pDb, SStbObj *pStb,
const char *streamName) {
SSmaObj pSma = {0};
SStreamObj pStream = {0};
initSMAObj(&pSma, pCreateReq, pStb, pDb);
initStreamObj(&pStream, streamName, pCreateReq, pDb, &pSma);
static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
int32_t code;
SSmaObj sma = {0};
SCMCreateStreamReq createStreamReq = {0};
SMDropStreamReq dropStreamReq = {0};
SNode* pAst = NULL;
if (nodesStringToNode(pCreateReq->ast, &pAst) < 0) {
terrno = TSDB_CODE_MND_INVALID_SMA_OPTION;
mError("tsma:%s, failed to create since parse ast error", pSma.name);
return -1;
}
pCxt->pSma = &sma;
initSMAObj(&sma, pCxt->pCreateSmaReq, pCxt->pSrcStb, pCxt->pDb);
pCxt->pCreateStreamReq = &createStreamReq;
pCxt->pDropStreamReq = &dropStreamReq;
mndCreateTSMABuildStreamOpReq(pCxt);
if (qExtractResultSchema(pAst, (int32_t *)&pStream.outputSchema.nCols, &pStream.outputSchema.pSchema) != 0) {
terrno = TSDB_CODE_MND_INVALID_SMA_OPTION;
mError("sma:%s, failed to create since extract result schema error", pSma.name);
return -1;
}
SQueryPlan* pPlan = NULL;
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = pStream.conf.trigger,
.watermark = pStream.conf.watermark,
.deleteMark = pStream.deleteMark,
};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
terrno = TSDB_CODE_MND_INVALID_SMA_OPTION;
mError("sma:%s, failed to create since create query plan error", pSma.name);
return -1;
}
if (nodesNodeToString((SNode *)pPlan, false, &pStream.physicalPlan, NULL) != 0) {
terrno = TSDB_CODE_MND_INVALID_SMA_OPTION;
mError("sma:%s, failed to create since save physcial plan error", pSma.name);
return -1;
}
if (pAst) nodesDestroyNode(pAst);
nodesDestroyNode((SNode*)pPlan);
int32_t code;
if (TSDB_CODE_SUCCESS != mndCreateTSMATxnPrepare(pMnode, pReq, pDb, pCreateReq, &pStream, &pSma)) {
if (TSDB_CODE_SUCCESS != mndCreateTSMATxnPrepare(pCxt)) {
code = -1;
goto _OVER;
} else {
mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", pCreateReq->name,
pSma.uid, pSma.stbUid, pSma.dstTbUid, pSma.dstTbName, pSma.dstVgId);
mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d",
pCxt->pCreateSmaReq->name, sma.uid, sma.stbUid, sma.dstTbUid, sma.dstTbName, sma.dstVgId);
code = 0;
}
_OVER:
pCxt->pCreateStreamReq = NULL;
return code;
}
@ -1486,7 +1560,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
goto _OVER;
#endif
SMnode * pMnode = pReq->info.node;
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = -1;
SDbObj * pDb = NULL;
SStbObj * pStb = NULL;
SSmaObj * pSma = NULL;
@ -1510,7 +1584,18 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
}
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
mndGetStreamNameFromSmaName(streamName, createReq.name);
char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
SName smaName;
tNameFromString(&smaName, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname);
snprintf(streamTargetStbFullName, TSDB_TABLE_FNAME_LEN, "%s.tsma_result_stb", createReq.name);
SStbObj *pTargetStb = mndAcquireStb(pMnode, streamTargetStbFullName);
if (pTargetStb) {
mError("tsma: %s, failed to create since output stable already exists: %s", createReq.name,
streamTargetStbFullName);
goto _OVER;
}
pStream = mndAcquireStream(pMnode, streamName);
if (pStream != NULL) {
@ -1540,7 +1625,19 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
goto _OVER;
}
code = mndCreateTSMA(pMnode, pReq, &createReq, pDb, pStb, streamName);
SCreateTSMACxt cxt = {
.pMnode = pMnode,
.pCreateSmaReq = &createReq,
.pCreateStreamReq = NULL,
.streamName = streamName,
.targetStbFullName = streamTargetStbFullName,
.pDb = pDb,
.pRpcReq = pReq,
.pSma = NULL,
.pSrcStb = pStb,
};
code = mndCreateTSMA(&cxt);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER:
@ -1554,10 +1651,42 @@ _OVER:
mndReleaseDb(pMnode, pDb);
tFreeSMCreateSmaReq(&createReq);
return TSDB_CODE_MND_SMA_ALREADY_EXIST;
return code;
}
static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = -1;
SMDropSmaReq dropReq = {0};
SSmaObj * pSma;
SDbObj * pDb;
SMnode * pMnode = pReq->info.node;
if (tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
pSma = mndAcquireSma(pMnode, dropReq.name);
if (!pSma && dropReq.igNotExists) {
code = 0;
goto _OVER;
}
if (!pSma) {
terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
goto _OVER;
}
pDb = mndAcquireDbBySma(pMnode, dropReq.name);
if (!pDb) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto _OVER;
}
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}
code = mndDropSma(pMnode, pReq, pDb, pSma);
code = TSDB_CODE_SUCCESS;
_OVER:
return code;
}

View File

@ -41,6 +41,10 @@ static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq);
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq);
static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq);
@ -102,7 +106,13 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
// for msgs inside mnode
mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode);
mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);
mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
@ -2338,3 +2348,25 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
return 0;
}
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
int32_t code = mndProcessCreateStreamReq(pReq);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
pReq->info.rsp = rpcMallocCont(1);
pReq->info.rspLen = 1;
pReq->info.noResp = false;
pReq->code = code;
}
return code;
}
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
int32_t code = mndProcessDropStreamReq(pReq);
if (code != 0) {
pReq->info.rsp = rpcMallocCont(1);
pReq->info.rspLen = 1;
pReq->info.noResp = false;
pReq->code = code;
}
return code;
}

0
source/libs/parser/inc/sql.y Executable file → Normal file
View File

View File

@ -7434,6 +7434,7 @@ typedef struct SSampleAstInfo {
SNodeList* pPartitionByList;
STableMeta* pRollupTableMeta;
bool createSmaIndex;
SNodeList* pTags;
} SSampleAstInfo;
static int32_t buildTableForSampleAst(SSampleAstInfo* pInfo, SNode** pOutput) {
@ -7530,6 +7531,7 @@ static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, ch
code = buildProjectsForSampleAst(pInfo, &pSelect->pProjectionList);
}
if (TSDB_CODE_SUCCESS == code) {
TSWAP(pInfo->pTags, pSelect->pTags);
TSWAP(pSelect->pPartitionByList, pInfo->pPartitionByList);
code = buildIntervalForSampleAst(pInfo, &pSelect->pWindow);
}
@ -10353,8 +10355,17 @@ static int32_t translateShowCreateView(STranslateContext* pCxt, SShowCreateViewS
#endif
}
static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, char** pAst, int32_t* pLen, char** pExpr,
int32_t* pExprLen) {
SNode* createColumnNodeWithName(const char* name) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (!pCol) return NULL;
tstrncpy(pCol->colName, name, TSDB_COL_NAME_LEN);
tstrncpy(pCol->node.aliasName, name, TSDB_COL_NAME_LEN);
tstrncpy(pCol->node.userAlias, name, TSDB_COL_NAME_LEN);
return (SNode*)pCol;
}
static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq,
STableMeta* pTableMeta) {
int32_t code = TSDB_CODE_SUCCESS;
SSampleAstInfo info = {0};
info.createSmaIndex = true;
@ -10363,8 +10374,32 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, cha
info.pFuncs = nodesCloneList(pStmt->pOptions->pFuncs);
info.pInterval = nodesCloneNode(pStmt->pOptions->pInterval);
if (!info.pFuncs || !info.pInterval) code = TSDB_CODE_OUT_OF_MEMORY;
if (TSDB_CODE_SUCCESS == code) {
// append partition by tbname
SNode* pTbnameFunc = createTbnameFunction();
if (pTbnameFunc) {
nodesListMakeAppend(&info.pPartitionByList, pTbnameFunc);
} else {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
// append partition by tags
int32_t numOfCols = pTableMeta->tableInfo.numOfColumns, numOfTags = pTableMeta->tableInfo.numOfTags;
for (int32_t idx = numOfCols; idx < numOfCols + numOfTags; ++idx) {
SNode* pTagCol = createColumnNodeWithName(pTableMeta->schema[idx].name);
if (!pTagCol) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
nodesListAppend(info.pPartitionByList, pTagCol);
nodesListMakeAppend(&info.pTags, nodesCloneNode(pTagCol));
}
}
if (code == TSDB_CODE_SUCCESS) {
code = buildSampleAst(pCxt, &info, pAst, pLen, pExpr, pExprLen);
code = buildSampleAst(pCxt, &info, &pReq->ast, &pReq->astLen, &pReq->expr, &pReq->exprLen);
}
clearSampleAstInfo(&info);
return code;
@ -10454,7 +10489,7 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
code = translateTSMAFuncs(pCxt, pStmt, pTableMeta);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildTSMAAst(pCxt, pStmt, &pReq->ast, &pReq->astLen, &pReq->expr, &pReq->exprLen);
code = buildTSMAAst(pCxt, pStmt, pReq, pTableMeta);
}
/*
if (TSDB_CODE_SUCCESS == code) {
@ -10489,6 +10524,20 @@ static int32_t translateCreateTSMA(STranslateContext* pCxt, SCreateTSMAStmt* pSt
return code;
}
static int32_t translateDropTSMA(STranslateContext* pCxt, SDropTSMAStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS;
pStmt->pReq = taosMemoryCalloc(1, sizeof(SMDropSmaReq));
if (!pStmt->pReq) code = TSDB_CODE_OUT_OF_MEMORY;
if (code == TSDB_CODE_SUCCESS) {
SName name;
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), pStmt->pReq->name);
pStmt->pReq->igNotExists = pStmt->ignoreNotExists;
}
if (TSDB_CODE_SUCCESS == code)
code = buildCmdMsg(pCxt, TDMT_MND_DROP_TSMA, (FSerializeFunc)tSerializeSMDropSmaReq, pStmt->pReq);
return code;
}
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) {