refactor(sma): create
This commit is contained in:
parent
928f45709d
commit
32d8e60ce2
|
@ -546,13 +546,10 @@ typedef struct {
|
|||
int64_t createTime;
|
||||
int64_t updateTime;
|
||||
int32_t version;
|
||||
// TODO remove it
|
||||
int64_t smaId; // 0 for unused
|
||||
// info
|
||||
int64_t uid;
|
||||
int8_t status;
|
||||
// TODO remove it
|
||||
int32_t vgNum;
|
||||
// config
|
||||
int8_t dropPolicy;
|
||||
int8_t trigger;
|
||||
|
@ -565,9 +562,10 @@ typedef struct {
|
|||
char targetDb[TSDB_DB_FNAME_LEN];
|
||||
char targetSTbName[TSDB_TABLE_FNAME_LEN];
|
||||
int64_t targetStbUid;
|
||||
// assigned when scheduling
|
||||
int32_t fixedSinkVgId; // 0 for shuffle
|
||||
// fixedSinkVg is not applicable for encode and decode
|
||||
SVgObj fixedSinkVg;
|
||||
|
||||
// transformation
|
||||
char* sql;
|
||||
char* ast;
|
||||
|
|
|
@ -32,6 +32,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
|
|||
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
|
||||
int64_t watermark, double filesFactor);
|
||||
|
||||
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
|
|||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
||||
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
||||
|
||||
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans);
|
||||
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
|
||||
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||
|
||||
|
|
|
@ -18,35 +18,35 @@
|
|||
#include "mndConsumer.h"
|
||||
|
||||
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
||||
int32_t sz = 0;
|
||||
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
||||
|
||||
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
|
||||
|
||||
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
|
||||
|
||||
if (tEncodeI8(pEncoder, pObj->dropPolicy) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
|
||||
|
||||
if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pObj->vgNum) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->dropPolicy) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
|
||||
/*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/
|
||||
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
|
||||
// TODO encode tasks
|
||||
if (pObj->tasks) {
|
||||
sz = taosArrayGetSize(pObj->tasks);
|
||||
}
|
||||
if (tEncodeI32(pEncoder, sz) < 0) return -1;
|
||||
|
||||
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->ast) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
|
||||
|
||||
int32_t sz = taosArrayGetSize(pObj->tasks);
|
||||
if (tEncodeI32(pEncoder, sz) < 0) return -1;
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SArray *pArray = taosArrayGetP(pObj->tasks, i);
|
||||
int32_t innerSz = taosArrayGetSize(pArray);
|
||||
|
@ -58,32 +58,38 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
|||
}
|
||||
|
||||
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
|
||||
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
||||
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
|
||||
|
||||
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
|
||||
|
||||
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
||||
|
||||
if (tDecodeI8(pDecoder, &pObj->dropPolicy) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
|
||||
|
||||
if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pObj->vgNum) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->dropPolicy) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
|
||||
|
||||
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
|
||||
/*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/
|
||||
if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1;
|
||||
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
|
||||
|
||||
pObj->tasks = NULL;
|
||||
int32_t sz;
|
||||
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
|
||||
|
@ -104,6 +110,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
|||
}
|
||||
|
||||
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -346,8 +346,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
ASSERT(pStream->vgNum == 0);
|
||||
|
||||
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
|
||||
ASSERT(totLevel <= 2);
|
||||
pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "mndDnode.h"
|
||||
#include "mndInfoSchema.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndScheduler.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndStream.h"
|
||||
|
@ -563,6 +564,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
streamObj.smaId = smaObj.uid;
|
||||
streamObj.watermark = 0;
|
||||
streamObj.trigger = STREAM_TRIGGER_AT_ONCE;
|
||||
streamObj.ast = strdup(smaObj.ast);
|
||||
|
||||
if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) {
|
||||
mError("sma:%s, failed to create since %s", smaObj.name, terrstr());
|
||||
|
@ -571,6 +573,39 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
smaObj.dstVgId = streamObj.fixedSinkVg.vgId;
|
||||
streamObj.fixedSinkVgId = smaObj.dstVgId;
|
||||
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(streamObj.ast, &pAst) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// extract output schema from ast
|
||||
if (qExtractResultSchema(pAst, (int32_t *)&streamObj.outputSchema.nCols, &streamObj.outputSchema.pSchema) != 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SQueryPlan *pPlan = NULL;
|
||||
SPlanContext cxt = {
|
||||
.pAstRoot = pAst,
|
||||
.topicQuery = false,
|
||||
.streamQuery = true,
|
||||
.triggerType = streamObj.trigger,
|
||||
.watermark = streamObj.watermark,
|
||||
};
|
||||
|
||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// save physcial plan
|
||||
if (nodesNodeToString((SNode *)pPlan, false, &streamObj.physicalPlan, NULL) != 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
if (pAst != NULL) nodesDestroyNode(pAst);
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
@ -585,7 +620,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
||||
// if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
|
||||
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER;
|
||||
if (mndScheduleStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
|
||||
if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
|
|
@ -640,6 +640,12 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndPersistStream(pMnode, pTrans, &streamObj) < 0) {
|
||||
mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
|
|
Loading…
Reference in New Issue