sma
This commit is contained in:
parent
b7c31e9c06
commit
21a4b868e2
|
@ -31,6 +31,8 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
|
||||||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
||||||
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
||||||
|
|
||||||
|
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndStb.c"
|
#include "mndStb.c"
|
||||||
|
#include "mndStream.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
#include "mndVgroup.h"
|
#include "mndVgroup.h"
|
||||||
|
@ -404,6 +405,18 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
|
||||||
memcpy(smaObj.ast, pCreate->ast, smaObj.astLen);
|
memcpy(smaObj.ast, pCreate->ast, smaObj.astLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SStreamObj streamObj = {0};
|
||||||
|
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||||
|
tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
|
streamObj.createTime = taosGetTimestampMs();
|
||||||
|
streamObj.updateTime = streamObj.createTime;
|
||||||
|
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||||
|
streamObj.dbUid = pDb->uid;
|
||||||
|
streamObj.version = 1;
|
||||||
|
streamObj.sql = pCreate->sql;
|
||||||
|
/*streamObj.physicalPlan = "";*/
|
||||||
|
streamObj.logicalPlan = "not implemented";
|
||||||
|
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto _OVER;
|
if (pTrans == NULL) goto _OVER;
|
||||||
|
@ -414,6 +427,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
|
||||||
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||||
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||||
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
|
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
|
||||||
|
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -457,6 +471,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SStbObj *pStb = NULL;
|
SStbObj *pStb = NULL;
|
||||||
SSmaObj *pSma = NULL;
|
SSmaObj *pSma = NULL;
|
||||||
|
SStreamObj *pStream = NULL;
|
||||||
SDbObj *pDb = NULL;
|
SDbObj *pDb = NULL;
|
||||||
SUserObj *pUser = NULL;
|
SUserObj *pUser = NULL;
|
||||||
SMCreateSmaReq createReq = {0};
|
SMCreateSmaReq createReq = {0};
|
||||||
|
@ -476,6 +491,12 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) {
|
||||||
mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
|
mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pStream = mndAcquireStream(pMnode, createReq.name);
|
||||||
|
if (pStream != NULL) {
|
||||||
|
mError("sma:%s, failed to create since stream:%s already exist", createReq.name, createReq.name);
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
pSma = mndAcquireSma(pMnode, createReq.name);
|
pSma = mndAcquireSma(pMnode, createReq.name);
|
||||||
if (pSma != NULL) {
|
if (pSma != NULL) {
|
||||||
|
@ -514,6 +535,7 @@ _OVER:
|
||||||
|
|
||||||
mndReleaseStb(pMnode, pStb);
|
mndReleaseStb(pMnode, pStb);
|
||||||
mndReleaseSma(pMnode, pSma);
|
mndReleaseSma(pMnode, pSma);
|
||||||
|
mndReleaseStream(pMnode, pStream);
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
mndReleaseUser(pMnode, pUser);
|
mndReleaseUser(pMnode, pUser);
|
||||||
tFreeSMCreateSmaReq(&createReq);
|
tFreeSMCreateSmaReq(&createReq);
|
||||||
|
|
|
@ -218,13 +218,13 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char **pStr) {
|
static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
|
||||||
if (NULL == pCreate->ast) {
|
if (NULL == ast) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SNode *pAst = NULL;
|
SNode *pAst = NULL;
|
||||||
int32_t code = nodesStringToNode(pCreate->ast, &pAst);
|
int32_t code = nodesStringToNode(ast, &pAst);
|
||||||
|
|
||||||
SQueryPlan *pPlan = NULL;
|
SQueryPlan *pPlan = NULL;
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -245,6 +245,28 @@ static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char **
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) {
|
||||||
|
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) {
|
||||||
|
mError("topic:%s, failed to get plan since %s", pStream->name, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndScheduleStream(pMnode, pTrans, pStream) < 0) {
|
||||||
|
mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSdbRaw *pRedoRaw = mndStreamActionEncode(pStream);
|
||||||
|
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||||
|
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) {
|
static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) {
|
||||||
mDebug("stream:%s to create", pCreate->name);
|
mDebug("stream:%s to create", pCreate->name);
|
||||||
SStreamObj streamObj = {0};
|
SStreamObj streamObj = {0};
|
||||||
|
@ -259,11 +281,6 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
|
||||||
/*streamObj.physicalPlan = "";*/
|
/*streamObj.physicalPlan = "";*/
|
||||||
streamObj.logicalPlan = "not implemented";
|
streamObj.logicalPlan = "not implemented";
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(pCreate, &streamObj.physicalPlan)) {
|
|
||||||
mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("stream:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("stream:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
|
@ -271,20 +288,12 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
|
||||||
}
|
}
|
||||||
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
|
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
|
||||||
|
|
||||||
if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) {
|
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) {
|
||||||
mError("stream:%ld, schedule stream since %s", streamObj.uid, terrstr());
|
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSdbRaw *pRedoRaw = mndStreamActionEncode(&streamObj);
|
|
||||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
|
||||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
|
||||||
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
|
|
@ -156,7 +156,7 @@ void* MndTestSma::BuildCreateSmaReq(const char* smaname, const char* stbname, in
|
||||||
createReq.tagsFilterLen = strlen(createReq.tagsFilter) + 1;
|
createReq.tagsFilterLen = strlen(createReq.tagsFilter) + 1;
|
||||||
createReq.sql = (char*)sql;
|
createReq.sql = (char*)sql;
|
||||||
createReq.sqlLen = strlen(createReq.sql) + 1;
|
createReq.sqlLen = strlen(createReq.sql) + 1;
|
||||||
createReq.ast = (char*)expr;
|
createReq.ast = (char*)ast;
|
||||||
createReq.astLen = strlen(createReq.ast) + 1;
|
createReq.astLen = strlen(createReq.ast) + 1;
|
||||||
|
|
||||||
int32_t tlen = tSerializeSMCreateSmaReq(NULL, 0, &createReq);
|
int32_t tlen = tSerializeSMCreateSmaReq(NULL, 0, &createReq);
|
||||||
|
@ -201,7 +201,7 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
||||||
test.SendShowRetrieveReq();
|
test.SendShowRetrieveReq();
|
||||||
EXPECT_EQ(test.GetShowRows(), 1);
|
EXPECT_EQ(test.GetShowRows(), 1);
|
||||||
}
|
}
|
||||||
|
#if 0
|
||||||
{
|
{
|
||||||
pReq = BuildCreateSmaReq(smaname, stbname, 0, "expr", "tagsFilter", "sql", "ast", &contLen);
|
pReq = BuildCreateSmaReq(smaname, stbname, 0, "expr", "tagsFilter", "sql", "ast", &contLen);
|
||||||
pRsp = test.SendReq(TDMT_MND_CREATE_SMA, pReq, contLen);
|
pRsp = test.SendReq(TDMT_MND_CREATE_SMA, pReq, contLen);
|
||||||
|
@ -233,4 +233,5 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
||||||
test.SendShowRetrieveReq();
|
test.SendShowRetrieveReq();
|
||||||
EXPECT_EQ(test.GetShowRows(), 0);
|
EXPECT_EQ(test.GetShowRows(), 0);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue