server handle tsma creation

This commit is contained in:
wangjiaming0909 2023-11-22 09:23:31 +08:00
parent d2106e0ade
commit 5589cf0309
7 changed files with 226 additions and 3 deletions

View File

@ -223,6 +223,8 @@
TD_DEF_MSG_TYPE(TDMT_MND_S3MIGRATE_DB, "s3migrate-db", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_S3MIGRATE_DB_TIMER, "s3migrate-db-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TSMA, "create-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TSMA, "drop-tsma", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8

View File

@ -164,6 +164,9 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_PAUSE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RESUME_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_WHITELIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -47,6 +47,9 @@ static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq);
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndDestroySmaObj(SSmaObj *pSmaObj);
static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq);
static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq);
// sma and tag index comm func
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
@ -72,6 +75,10 @@ int32_t mndInitSma(SMnode *pMnode) {
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveIdx);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelRetrieveIdx);
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TSMA, mndProcessCreateTSMAReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TSMA, mndProcessDropTSMAReq);
return sdbSetTable(pMnode->pSdb, table);
}
@ -1345,3 +1352,212 @@ static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) {
}
taosMemoryFree(p);
}
static void initSMAObj(SSmaObj *pSma, const SMCreateSmaReq *pCreateReq, const SStbObj *pStb, const SDbObj *pDb) {
memcpy(pSma->name, pCreateReq->name, TSDB_TABLE_FNAME_LEN);
memcpy(pSma->stb, pStb->name, TSDB_TABLE_FNAME_LEN);
memcpy(pSma->db, pDb->name, TSDB_DB_FNAME_LEN);
pSma->createdTime = taosGetTimestampMs();
pSma->uid = mndGenerateUid(pCreateReq->name, TSDB_TABLE_FNAME_LEN);
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb", pCreateReq->name);
memcpy(pSma->dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
pSma->dstTbUid = mndGenerateUid(pSma->dstTbName, TSDB_TABLE_FNAME_LEN);
pSma->stbUid = pStb->uid;
pSma->dbUid = pDb->uid;
pSma->interval = pCreateReq->interval;
pSma->intervalUnit = pCreateReq->intervalUnit;
pSma->timezone = tsTimezone;
pSma->exprLen = pCreateReq->exprLen;
pSma->sqlLen = pCreateReq->sqlLen;
pSma->astLen = pCreateReq->astLen;
pSma->expr = pCreateReq->expr;
pSma->sql = pCreateReq->sql;
pSma->ast = pCreateReq->ast;
}
static void initStreamObj(SStreamObj *pStream, const char *streamName, const SMCreateSmaReq *pCreateReq,
const SDbObj *pDb, SSmaObj *pSma) {
tstrncpy(pStream->name, streamName, TSDB_STREAM_FNAME_LEN);
tstrncpy(pStream->sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
tstrncpy(pStream->targetDb, pDb->name, TSDB_DB_FNAME_LEN);
pStream->createTime = taosGetTimestampMs();
pStream->updateTime = pStream->createTime;
pStream->uid = mndGenerateUid(streamName, strlen(streamName));
pStream->sourceDbUid = pDb->uid;
pStream->targetDbUid = pDb->uid;
pStream->version = 1;
pStream->sql = taosStrdup(pCreateReq->sql);
pStream->smaId = pSma->uid;
//pStream->conf.watermark = 0;
//pStream->deleteMark = 0;
pStream->conf.fillHistory = STREAM_FILL_HISTORY_ON;
pStream->conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
pStream->conf.triggerParam = 10000;
pStream->ast = taosStrdup(pSma->ast);
}
static int32_t mndCreateTSMATxnPrepare(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SMCreateSmaReq *pCreate,
SStreamObj *pStream, SSmaObj *pSma) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-tsma");
if (!pTrans) return TSDB_CODE_OUT_OF_MEMORY;
mndTransSetDbName(pTrans, pDb->name, NULL);
if ((mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER;
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCreate->name, pStream->name);
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
if (mndScheduleStream(pMnode, pStream, 1685959190000) != 0) goto _OVER;
if (mndPersistStream(pMnode, pTrans, pStream) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = TSDB_CODE_SUCCESS;
_OVER:
mndTransDrop(pTrans);
return code;
}
static int32_t mndCreateTSMA(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreateReq, SDbObj *pDb, SStbObj *pStb,
const char *streamName) {
SSmaObj pSma = {0};
SStreamObj pStream = {0};
initSMAObj(&pSma, pCreateReq, pStb, pDb);
initStreamObj(&pStream, streamName, pCreateReq, pDb, &pSma);
SNode* pAst = NULL;
if (nodesStringToNode(pCreateReq->ast, &pAst) < 0) {
terrno = TSDB_CODE_MND_INVALID_SMA_OPTION;
mError("tsma:%s, failed to create since parse ast error", pSma.name);
return -1;
}
if (qExtractResultSchema(pAst, (int32_t *)&pStream.outputSchema.nCols, &pStream.outputSchema.pSchema) != 0) {
terrno = TSDB_CODE_MND_INVALID_SMA_OPTION;
mError("sma:%s, failed to create since extract result schema error", pSma.name);
return -1;
}
SQueryPlan* pPlan = NULL;
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = pStream.conf.trigger,
.watermark = pStream.conf.watermark,
.deleteMark = pStream.deleteMark,
};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
terrno = TSDB_CODE_MND_INVALID_SMA_OPTION;
mError("sma:%s, failed to create since create query plan error", pSma.name);
return -1;
}
if (nodesNodeToString((SNode *)pPlan, false, &pStream.physicalPlan, NULL) != 0) {
terrno = TSDB_CODE_MND_INVALID_SMA_OPTION;
mError("sma:%s, failed to create since save physcial plan error", pSma.name);
return -1;
}
if (pAst) nodesDestroyNode(pAst);
nodesDestroyNode((SNode*)pPlan);
int32_t code;
if (TSDB_CODE_SUCCESS != mndCreateTSMATxnPrepare(pMnode, pReq, pDb, pCreateReq, &pStream, &pSma)) {
code = -1;
} else {
mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", pCreateReq->name,
pSma.uid, pSma.stbUid, pSma.dstTbUid, pSma.dstTbName, pSma.dstVgId);
code = 0;
}
return code;
}
static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
#ifdef WINDOWS
terrno = TSDB_CODE_MND_INVALID_PLATFORM;
goto _OVER;
#endif
SMnode * pMnode = pReq->info.node;
int32_t code = TSDB_CODE_SUCCESS;
SDbObj * pDb = NULL;
SStbObj * pStb = NULL;
SSmaObj * pSma = NULL;
SStreamObj * pStream = NULL;
int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
SMCreateSmaReq createReq = {0};
if (tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
mInfo("start to create tsma: %s", createReq.name);
if (mndCheckCreateSmaReq(&createReq))
goto _OVER;
pStb = mndAcquireStb(pMnode, createReq.stb);
if (!pStb) {
mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
goto _OVER;
}
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
mndGetStreamNameFromSmaName(streamName, createReq.name);
pStream = mndAcquireStream(pMnode, streamName);
if (pStream != NULL) {
mError("tsma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
goto _OVER;
}
pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.name);
if (pSma && createReq.igExists) {
mInfo("tsma:%s, already exists in sma:%s, ignore exist is set", createReq.name, pSma->name);
code = 0;
goto _OVER;
}
if (pSma) {
terrno = TSDB_CODE_MND_SMA_ALREADY_EXIST;
goto _OVER;
}
pDb = mndAcquireDbBySma(pMnode, createReq.name);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
goto _OVER;
}
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}
code = mndCreateTSMA(pMnode, pReq, &createReq, pDb, pStb, streamName);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("tsma:%s, failed to create since %s", createReq.name, terrstr());
}
mndReleaseStb(pMnode, pStb);
mndReleaseSma(pMnode, pSma);
mndReleaseStream(pMnode, pStream);
mndReleaseDb(pMnode, pDb);
tFreeSMCreateSmaReq(&createReq);
return TSDB_CODE_MND_SMA_ALREADY_EXIST;
}
static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) {
int32_t code = TSDB_CODE_SUCCESS;
return code;
}

View File

@ -2848,7 +2848,7 @@ SNode* createCreateTSMAStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken*
pStmt->ignoreExists = ignoreExists;
pStmt->pOptions = (STSMAOptions*)pOptions;
pStmt->pOptions->pInterval = pInterval;
COPY_STRING_FORM_STR_TOKEN(pStmt->tsmaName, tsmaName);
COPY_STRING_FORM_ID_TOKEN(pStmt->tsmaName, tsmaName);
SRealTableNode* pTable = (SRealTableNode*)pRealTable;
memcpy(pStmt->dbName, pTable->table.dbName, TSDB_DB_NAME_LEN);

View File

@ -10482,7 +10482,8 @@ static int32_t translateCreateTSMA(STranslateContext* pCxt, SCreateTSMAStmt* pSt
code = buildCreateTSMAReq(pCxt, pStmt, pStmt->pReq);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_SMA, (FSerializeFunc)tSerializeSMCreateSmaReq, pStmt->pReq);
// TODO replace with tsma serialization func
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TSMA, (FSerializeFunc)tSerializeSMCreateSmaReq, pStmt->pReq);
}
tFreeSMCreateSmaReq(pStmt->pReq);
return code;

View File

@ -322,7 +322,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STREAMS, "Too many streams")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TARGET_TABLE, "Cannot write the same stable as other stream")
// mnode-sma
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "index already exists in db")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists in db")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "index not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SMA_OPTION, "Invalid sma index option")

View File

@ -167,6 +167,7 @@ class TDTestCase:
def run(self):
self.check_setup_cluster_status()
sleep(9999999)
self.create_db_check_vgroups()
self.create_db_replica_3_insertdatas(self.db_name , self.replica , self.vgroups , self.tb_nums , self.row_nums)
self.check_insert_status(self.db_name , self.tb_nums , self.row_nums)