diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7c0e8206e1..01d99ac705 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1519,7 +1519,7 @@ typedef struct { #define STREAM_TRIGGER_MAX_DELAY 3 typedef struct { - char name[TSDB_TABLE_FNAME_LEN]; + char name[TSDB_STREAM_FNAME_LEN]; char sourceDB[TSDB_DB_FNAME_LEN]; char targetStbFullName[TSDB_TABLE_FNAME_LEN]; int8_t igExists; @@ -1539,7 +1539,7 @@ int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStrea void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq); typedef struct { - char name[TSDB_TOPIC_FNAME_LEN]; + char name[TSDB_STREAM_FNAME_LEN]; int64_t streamId; char* sql; char* executorMsg; diff --git a/include/util/tdef.h b/include/util/tdef.h index 4798a9375c..a48e823bad 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -210,7 +210,7 @@ typedef enum ELogicConditionType { #define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TOPIC_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) -#define TSDB_STREAM_FNAME_LEN TSDB_TABLE_FNAME_LEN +#define TSDB_STREAM_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2) #define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20) #define TSDB_COL_NAME_LEN 65 diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 35cbf178ec..db18f87830 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -539,27 +539,38 @@ typedef struct { } SMqRebOutputObj; typedef struct { - char name[TSDB_STREAM_FNAME_LEN]; - char sourceDb[TSDB_DB_FNAME_LEN]; - char targetDb[TSDB_DB_FNAME_LEN]; - char targetSTbName[TSDB_TABLE_FNAME_LEN]; - int64_t targetStbUid; - int64_t createTime; - int64_t updateTime; - int64_t uid; - int64_t dbUid; - int32_t version; - int32_t vgNum; - SRWLatch lock; - int8_t status; - int8_t createdBy; // STREAM_CREATED_BY__USER or SMA - int32_t fixedSinkVgId; // 0 for shuffle - SVgObj fixedSinkVg; - int64_t smaId; // 0 for unused - int8_t trigger; - int64_t triggerParam; - int64_t watermark; + char name[TSDB_STREAM_FNAME_LEN]; + // ctl + SRWLatch lock; + // create info + int64_t createTime; + int64_t updateTime; + int32_t version; + // TODO remove it + int64_t smaId; // 0 for unused + // info + int64_t uid; + int8_t status; + // TODO remove it + int32_t vgNum; + // config + int8_t dropPolicy; + int8_t trigger; + int64_t triggerParam; + int64_t watermark; + // source and target + int64_t sourceDbUid; + int64_t targetDbUid; + char sourceDb[TSDB_DB_FNAME_LEN]; + char targetDb[TSDB_DB_FNAME_LEN]; + char targetSTbName[TSDB_TABLE_FNAME_LEN]; + int64_t targetStbUid; + // assigned when scheduling + int32_t fixedSinkVgId; // 0 for shuffle + SVgObj fixedSinkVg; + // transformation char* sql; + char* ast; char* physicalPlan; SArray* tasks; // SArray> SSchemaWrapper outputSchema; diff --git a/source/dnode/mnode/impl/inc/mndStb.h b/source/dnode/mnode/impl/inc/mndStb.h index d3de4e3b3f..44a7fdadde 100644 --- a/source/dnode/mnode/impl/inc/mndStb.h +++ b/source/dnode/mnode/impl/inc/mndStb.h @@ -27,8 +27,7 @@ void mndCleanupStb(SMnode *pMnode); SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName); void mndReleaseStb(SMnode *pMnode, SStbObj *pStb); SSdbRaw *mndStbActionEncode(SStbObj *pStb); -int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbs, int32_t numOfStbs, void **ppRsp, - int32_t *pRspLen); +int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbs, int32_t numOfStbs, void **ppRsp, int32_t *pRspLen); int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs); int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate); @@ -36,6 +35,9 @@ SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName); int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb); int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb); +void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst); +void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index b5d22cb7a5..65017027dc 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -33,6 +33,8 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans); +int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 6eac91bb43..b9bd1e4298 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -19,7 +19,6 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { int32_t sz = 0; - /*int32_t outputNameSz = 0;*/ if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1; @@ -28,10 +27,12 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1; if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; + if (tEncodeI32(pEncoder, pObj->vgNum) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->dropPolicy) < 0) return -1; if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; - if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1; if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1; if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1; if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1; @@ -57,17 +58,6 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { } if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1; - -#if 0 - if (pObj->ColAlias != NULL) { - outputNameSz = taosArrayGetSize(pObj->ColAlias); - } - if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1; - for (int32_t i = 0; i < outputNameSz; i++) { - char *name = taosArrayGetP(pObj->ColAlias, i); - if (tEncodeCStr(pEncoder, name) < 0) return -1; - } -#endif return pEncoder->pos; } @@ -80,10 +70,12 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; + if (tDecodeI32(pDecoder, &pObj->vgNum) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->dropPolicy) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; - if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1; @@ -112,21 +104,6 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { } if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1; -#if 0 - int32_t outputNameSz; - if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1; - if (outputNameSz != 0) { - pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *)); - if (pObj->ColAlias == NULL) { - return -1; - } - } - for (int32_t i = 0; i < outputNameSz; i++) { - char *name; - if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1; - taosArrayPush(pObj->ColAlias, &name); - } -#endif return 0; } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index ffddab3a91..c53e97cd53 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -131,7 +131,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet int32_t mndAddSinkToTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) { pTask->dispatchType = TASK_DISPATCH__NONE; // sink - if (pStream->createdBy == STREAM_CREATED_BY__SMA) { + if (pStream->smaId != 0) { pTask->sinkType = TASK_SINK__SMA; pTask->smaSink.smaId = pStream->smaId; } else { @@ -275,7 +275,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb pTask->execType = TASK_EXEC__NONE; // sink - if (pStream->createdBy == STREAM_CREATED_BY__SMA) { + if (pStream->smaId != 0) { pTask->sinkType = TASK_SINK__SMA; pTask->smaSink.smaId = pStream->smaId; } else { @@ -321,7 +321,7 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pTask->execType = TASK_EXEC__NONE; // sink - if (pStream->createdBy == STREAM_CREATED_BY__SMA) { + if (pStream->smaId != 0) { pTask->sinkType = TASK_SINK__SMA; pTask->smaSink.smaId = pStream->smaId; } else { @@ -399,7 +399,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // exec pFinalTask->execType = TASK_EXEC__PIPE; - SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid); + SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); if (mndAssignTaskToVg(pMnode, pTrans, pFinalTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); @@ -420,7 +420,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SVgObj* pVgroup; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pStream->dbUid) { + if (pVgroup->dbUid != pStream->sourceDbUid) { sdbRelease(pSdb, pVgroup); continue; } @@ -463,7 +463,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SVgObj* pVgroup; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pStream->dbUid) { + if (pVgroup->dbUid != pStream->sourceDbUid) { sdbRelease(pSdb, pVgroup); continue; } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 0f6bb74d9a..54fd0fe867 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -520,6 +520,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN); smaObj.createdTime = taosGetTimestampMs(); smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); + ASSERT(smaObj.uid != 0); char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0}; snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "td.tsma.rst.tb.%s", pCreate->name); memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN); @@ -556,10 +557,9 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea streamObj.createTime = taosGetTimestampMs(); streamObj.updateTime = streamObj.createTime; streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); - streamObj.dbUid = pDb->uid; + streamObj.sourceDbUid = pDb->uid; streamObj.version = 1; streamObj.sql = pCreate->sql; - streamObj.createdBy = STREAM_CREATED_BY__SMA; streamObj.smaId = smaObj.uid; streamObj.watermark = 0; streamObj.trigger = STREAM_TRIGGER_AT_ONCE; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index d4a38dd17b..b83aa34a3a 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -28,7 +28,6 @@ #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" -#include "mndSma.h" #include "tname.h" #define STB_VER_NUMBER 1 @@ -1271,7 +1270,8 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa return 0; } -static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp, int32_t *smaVer) { +static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp, + int32_t *smaVer) { char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName); @@ -1672,7 +1672,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - + for (int32_t i = 0; i < numOfStbs; ++i) { SSTableVersion *pStbVersion = &pStbVersions[i]; pStbVersion->suid = be64toh(pStbVersion->suid); @@ -1681,7 +1681,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t pStbVersion->smaVer = ntohl(pStbVersion->smaVer); STableMetaRsp metaRsp = {0}; - int32_t smaVer = 0; + int32_t smaVer = 0; mDebug("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName); if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp, &smaVer) != 0) { metaRsp.numOfColumns = -1; @@ -1697,15 +1697,15 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t } if (pStbVersion->smaVer && pStbVersion->smaVer != smaVer) { - bool exist = false; - char tbFName[TSDB_TABLE_FNAME_LEN]; + bool exist = false; + char tbFName[TSDB_TABLE_FNAME_LEN]; STableIndexRsp indexRsp = {0}; indexRsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo)); if (NULL == indexRsp.pIndex) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - + sprintf(tbFName, "%s.%s", pStbVersion->dbFName, pStbVersion->stbName); int32_t code = mndGetTableSma(pMnode, tbFName, &indexRsp, &exist); if (code || !exist) { @@ -1769,16 +1769,23 @@ int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) { return 0; } -static void mndExtractTableName(char *tableId, char *name) { +void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst) { + SName name = {0}; + tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + + tNameGetFullDbName(&name, dst); +} + +void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize) { int32_t pos = -1; int32_t num = 0; - for (pos = 0; tableId[pos] != 0; ++pos) { - if (tableId[pos] == TS_PATH_DELIMITER[0]) num++; + for (pos = 0; stbFullName[pos] != 0; ++pos) { + if (stbFullName[pos] == TS_PATH_DELIMITER[0]) num++; if (num == 2) break; } if (num == 2) { - strcpy(name, tableId + pos + 1); + tstrncpy(dst, stbFullName + pos + 1, dstSize); } } @@ -1808,7 +1815,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc SName name = {0}; char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - mndExtractTableName(pStb->name, &stbName[VARSTR_HEADER_SIZE]); + mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN); varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE])); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 491835fc15..de0b25b1dc 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -196,7 +196,8 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) { } static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { - if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) { + if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 || + pCreate->targetStbFullName[0] == 0) { terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION; return -1; } @@ -232,6 +233,114 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64 return code; } +static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) { + SNode *pAst = NULL; + + mDebug("stream:%s to create", pCreate->name); + memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN); + pObj->createTime = taosGetTimestampMs(); + pObj->updateTime = pObj->createTime; + pObj->version = 1; + pObj->smaId = 0; + + pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name)); + pObj->status = 0; + + // TODO + pObj->dropPolicy = 0; + pObj->trigger = pCreate->triggerType; + pObj->triggerParam = pCreate->maxDelay; + pObj->watermark = pCreate->watermark; + + memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN); + SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); + if (pSourceDb == NULL) { + /*ASSERT(0);*/ + mDebug("stream:%s failed to create, source db %s not exist", pCreate->name, pObj->sourceDb); + terrno = TSDB_CODE_MND_DB_NOT_EXIST; + return -1; + } + pObj->sourceDbUid = pSourceDb->uid; + + memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN); + + SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName); + if (pTargetDb == NULL) { + mDebug("stream:%s failed to create, target db %s not exist", pCreate->name, pObj->targetDb); + terrno = TSDB_CODE_MND_DB_NOT_EXIST; + return -1; + } + tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN); + + pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN); + pObj->targetDbUid = pTargetDb->uid; + + pObj->sql = pCreate->sql; + pObj->ast = pCreate->ast; + + pCreate->sql = NULL; + pCreate->ast = NULL; + + // deserialize ast + if (nodesStringToNode(pObj->ast, &pAst) < 0) { + /*ASSERT(0);*/ + goto FAIL; + } + + // extract output schema from ast + if (qExtractResultSchema(pAst, (int32_t *)&pObj->outputSchema.nCols, &pObj->outputSchema.pSchema) != 0) { + /*ASSERT(0);*/ + goto FAIL; + } + + SQueryPlan *pPlan = NULL; + SPlanContext cxt = { + .pAstRoot = pAst, + .topicQuery = false, + .streamQuery = true, + .triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger, + .watermark = pObj->watermark, + }; + + // using ast and param to build physical plan + if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { + /*ASSERT(0);*/ + goto FAIL; + } + + // save physcial plan + if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) { + /*ASSERT(0);*/ + goto FAIL; + } + +FAIL: + if (pAst != NULL) nodesDestroyNode(pAst); + return 0; +} + +int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { + SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + return 0; +} + +int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { + SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); + return 0; +} + int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { SNode *pAst = NULL; @@ -356,24 +465,24 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { ASSERT(pTask->nodeId != 0); // vnode - if (pTask->nodeId > 0) { - SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); - if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pReq->head.vgId = htonl(pTask->nodeId); - pReq->taskId = pTask->taskId; - STransAction action = {0}; - memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); - action.pCont = pReq; - action.contLen = sizeof(SVDropStreamTaskReq); - action.msgType = TDMT_VND_STREAM_TASK_DROP; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } + /*if (pTask->nodeId > 0) {*/ + SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } + pReq->head.vgId = htonl(pTask->nodeId); + pReq->taskId = pTask->taskId; + STransAction action = {0}; + memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); + action.pCont = pReq; + action.contLen = sizeof(SVDropStreamTaskReq); + action.msgType = TDMT_VND_STREAM_TASK_DROP; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + /*}*/ return 0; } @@ -403,10 +512,9 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq streamObj.updateTime = streamObj.createTime; streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN); - streamObj.dbUid = pDb->uid; + streamObj.sourceDbUid = pDb->uid; streamObj.version = 1; streamObj.sql = pCreate->sql; - streamObj.createdBy = STREAM_CREATED_BY__USER; // TODO streamObj.fixedSinkVgId = 0; streamObj.smaId = 0; @@ -486,7 +594,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - // TODO check auth + // TODO check read auth for source and write auth for target +#if 0 pDb = mndAcquireDb(pMnode, createStreamReq.sourceDB); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; @@ -496,9 +605,52 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) { goto _OVER; } +#endif - code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb); - if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; + // build stream obj from request + // + // schedule stream task for stream obj + // + SStreamObj streamObj = {0}; + if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) { + ASSERT(0); + mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); + goto _OVER; + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq); + if (pTrans == NULL) { + mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); + goto _OVER; + } + mndTransSetDbName(pTrans, streamObj.sourceDb); + // TODO + /*mndTransSetDbName(pTrans, streamObj.targetDb);*/ + mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); + + if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) { + mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr()); + mndTransDrop(pTrans); + goto _OVER; + } + + if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) { + mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr()); + mndTransDrop(pTrans); + goto _OVER; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + goto _OVER; + } + + mndTransDrop(pTrans); + + /*code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb);*/ + /*if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;*/ + code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { @@ -555,10 +707,54 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return code; } + // drop stream + if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + DROP_STREAM_OVER: return code; } +int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + SStreamObj *pStream = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) break; + + if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) { + if (pStream->sourceDbUid != pStream->targetDbUid) { + sdbRelease(pSdb, pStream); + return -1; + } else { + // TODO drop all task on snode + if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { + sdbRelease(pSdb, pStream); + return -1; + } + } + } else { + sdbRelease(pSdb, pStream); + continue; + } + +#if 0 + if (mndSetDropOffsetStreamLogs(pMnode, pTrans, pStream) < 0) { + sdbRelease(pSdb, pStream); + goto END; + } +#endif + + sdbRelease(pSdb, pStream); + } + + return 0; +} + static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) { SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); @@ -574,7 +770,7 @@ static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfS pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; - if (pStream->dbUid == pDb->uid) { + if (pStream->sourceDbUid == pDb->uid) { numOfStreams++; }