feat(stream): drop stream
This commit is contained in:
parent
803249837a
commit
928f45709d
|
@ -1519,7 +1519,7 @@ typedef struct {
|
||||||
#define STREAM_TRIGGER_MAX_DELAY 3
|
#define STREAM_TRIGGER_MAX_DELAY 3
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
char name[TSDB_STREAM_FNAME_LEN];
|
||||||
char sourceDB[TSDB_DB_FNAME_LEN];
|
char sourceDB[TSDB_DB_FNAME_LEN];
|
||||||
char targetStbFullName[TSDB_TABLE_FNAME_LEN];
|
char targetStbFullName[TSDB_TABLE_FNAME_LEN];
|
||||||
int8_t igExists;
|
int8_t igExists;
|
||||||
|
@ -1539,7 +1539,7 @@ int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStrea
|
||||||
void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq);
|
void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_STREAM_FNAME_LEN];
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
char* sql;
|
char* sql;
|
||||||
char* executorMsg;
|
char* executorMsg;
|
||||||
|
|
|
@ -210,7 +210,7 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_TYPE_STR_MAX_LEN 32
|
#define TSDB_TYPE_STR_MAX_LEN 32
|
||||||
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||||
#define TSDB_TOPIC_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
#define TSDB_TOPIC_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||||
#define TSDB_STREAM_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
#define TSDB_STREAM_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||||
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
|
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
|
||||||
#define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20)
|
#define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20)
|
||||||
#define TSDB_COL_NAME_LEN 65
|
#define TSDB_COL_NAME_LEN 65
|
||||||
|
|
|
@ -540,26 +540,37 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_STREAM_FNAME_LEN];
|
char name[TSDB_STREAM_FNAME_LEN];
|
||||||
|
// ctl
|
||||||
|
SRWLatch lock;
|
||||||
|
// create info
|
||||||
|
int64_t createTime;
|
||||||
|
int64_t updateTime;
|
||||||
|
int32_t version;
|
||||||
|
// TODO remove it
|
||||||
|
int64_t smaId; // 0 for unused
|
||||||
|
// info
|
||||||
|
int64_t uid;
|
||||||
|
int8_t status;
|
||||||
|
// TODO remove it
|
||||||
|
int32_t vgNum;
|
||||||
|
// config
|
||||||
|
int8_t dropPolicy;
|
||||||
|
int8_t trigger;
|
||||||
|
int64_t triggerParam;
|
||||||
|
int64_t watermark;
|
||||||
|
// source and target
|
||||||
|
int64_t sourceDbUid;
|
||||||
|
int64_t targetDbUid;
|
||||||
char sourceDb[TSDB_DB_FNAME_LEN];
|
char sourceDb[TSDB_DB_FNAME_LEN];
|
||||||
char targetDb[TSDB_DB_FNAME_LEN];
|
char targetDb[TSDB_DB_FNAME_LEN];
|
||||||
char targetSTbName[TSDB_TABLE_FNAME_LEN];
|
char targetSTbName[TSDB_TABLE_FNAME_LEN];
|
||||||
int64_t targetStbUid;
|
int64_t targetStbUid;
|
||||||
int64_t createTime;
|
// assigned when scheduling
|
||||||
int64_t updateTime;
|
|
||||||
int64_t uid;
|
|
||||||
int64_t dbUid;
|
|
||||||
int32_t version;
|
|
||||||
int32_t vgNum;
|
|
||||||
SRWLatch lock;
|
|
||||||
int8_t status;
|
|
||||||
int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
|
|
||||||
int32_t fixedSinkVgId; // 0 for shuffle
|
int32_t fixedSinkVgId; // 0 for shuffle
|
||||||
SVgObj fixedSinkVg;
|
SVgObj fixedSinkVg;
|
||||||
int64_t smaId; // 0 for unused
|
// transformation
|
||||||
int8_t trigger;
|
|
||||||
int64_t triggerParam;
|
|
||||||
int64_t watermark;
|
|
||||||
char* sql;
|
char* sql;
|
||||||
|
char* ast;
|
||||||
char* physicalPlan;
|
char* physicalPlan;
|
||||||
SArray* tasks; // SArray<SArray<SStreamTask>>
|
SArray* tasks; // SArray<SArray<SStreamTask>>
|
||||||
SSchemaWrapper outputSchema;
|
SSchemaWrapper outputSchema;
|
||||||
|
|
|
@ -27,8 +27,7 @@ void mndCleanupStb(SMnode *pMnode);
|
||||||
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName);
|
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName);
|
||||||
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb);
|
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb);
|
||||||
SSdbRaw *mndStbActionEncode(SStbObj *pStb);
|
SSdbRaw *mndStbActionEncode(SStbObj *pStb);
|
||||||
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbs, int32_t numOfStbs, void **ppRsp,
|
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbs, int32_t numOfStbs, void **ppRsp, int32_t *pRspLen);
|
||||||
int32_t *pRspLen);
|
|
||||||
int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs);
|
int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs);
|
||||||
|
|
||||||
int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate);
|
int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate);
|
||||||
|
@ -36,6 +35,9 @@ SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName);
|
||||||
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb);
|
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb);
|
||||||
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
|
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
|
||||||
|
|
||||||
|
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
|
||||||
|
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -33,6 +33,8 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
||||||
|
|
||||||
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans);
|
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans);
|
||||||
|
|
||||||
|
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
|
|
||||||
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
||||||
int32_t sz = 0;
|
int32_t sz = 0;
|
||||||
/*int32_t outputNameSz = 0;*/
|
|
||||||
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
|
||||||
|
@ -28,10 +27,12 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
||||||
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
|
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pObj->vgNum) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pObj->dropPolicy) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
|
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
|
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
|
||||||
|
@ -57,17 +58,6 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
|
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
|
||||||
|
|
||||||
#if 0
|
|
||||||
if (pObj->ColAlias != NULL) {
|
|
||||||
outputNameSz = taosArrayGetSize(pObj->ColAlias);
|
|
||||||
}
|
|
||||||
if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1;
|
|
||||||
for (int32_t i = 0; i < outputNameSz; i++) {
|
|
||||||
char *name = taosArrayGetP(pObj->ColAlias, i);
|
|
||||||
if (tEncodeCStr(pEncoder, name) < 0) return -1;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
return pEncoder->pos;
|
return pEncoder->pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,10 +70,12 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
||||||
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pObj->vgNum) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pObj->dropPolicy) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1;
|
|
||||||
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
|
||||||
|
@ -112,21 +104,6 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
|
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
|
||||||
#if 0
|
|
||||||
int32_t outputNameSz;
|
|
||||||
if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
|
|
||||||
if (outputNameSz != 0) {
|
|
||||||
pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *));
|
|
||||||
if (pObj->ColAlias == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (int32_t i = 0; i < outputNameSz; i++) {
|
|
||||||
char *name;
|
|
||||||
if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
|
|
||||||
taosArrayPush(pObj->ColAlias, &name);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -131,7 +131,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
|
||||||
int32_t mndAddSinkToTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) {
|
int32_t mndAddSinkToTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) {
|
||||||
pTask->dispatchType = TASK_DISPATCH__NONE;
|
pTask->dispatchType = TASK_DISPATCH__NONE;
|
||||||
// sink
|
// sink
|
||||||
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
|
if (pStream->smaId != 0) {
|
||||||
pTask->sinkType = TASK_SINK__SMA;
|
pTask->sinkType = TASK_SINK__SMA;
|
||||||
pTask->smaSink.smaId = pStream->smaId;
|
pTask->smaSink.smaId = pStream->smaId;
|
||||||
} else {
|
} else {
|
||||||
|
@ -275,7 +275,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb
|
||||||
pTask->execType = TASK_EXEC__NONE;
|
pTask->execType = TASK_EXEC__NONE;
|
||||||
|
|
||||||
// sink
|
// sink
|
||||||
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
|
if (pStream->smaId != 0) {
|
||||||
pTask->sinkType = TASK_SINK__SMA;
|
pTask->sinkType = TASK_SINK__SMA;
|
||||||
pTask->smaSink.smaId = pStream->smaId;
|
pTask->smaSink.smaId = pStream->smaId;
|
||||||
} else {
|
} else {
|
||||||
|
@ -321,7 +321,7 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj*
|
||||||
pTask->execType = TASK_EXEC__NONE;
|
pTask->execType = TASK_EXEC__NONE;
|
||||||
|
|
||||||
// sink
|
// sink
|
||||||
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
|
if (pStream->smaId != 0) {
|
||||||
pTask->sinkType = TASK_SINK__SMA;
|
pTask->sinkType = TASK_SINK__SMA;
|
||||||
pTask->smaSink.smaId = pStream->smaId;
|
pTask->smaSink.smaId = pStream->smaId;
|
||||||
} else {
|
} else {
|
||||||
|
@ -399,7 +399,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
|
|
||||||
// exec
|
// exec
|
||||||
pFinalTask->execType = TASK_EXEC__PIPE;
|
pFinalTask->execType = TASK_EXEC__PIPE;
|
||||||
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid);
|
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
|
||||||
if (mndAssignTaskToVg(pMnode, pTrans, pFinalTask, plan, pVgroup) < 0) {
|
if (mndAssignTaskToVg(pMnode, pTrans, pFinalTask, plan, pVgroup) < 0) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
|
@ -420,7 +420,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
SVgObj* pVgroup;
|
SVgObj* pVgroup;
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
if (pVgroup->dbUid != pStream->dbUid) {
|
if (pVgroup->dbUid != pStream->sourceDbUid) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -463,7 +463,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
SVgObj* pVgroup;
|
SVgObj* pVgroup;
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
if (pVgroup->dbUid != pStream->dbUid) {
|
if (pVgroup->dbUid != pStream->sourceDbUid) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -520,6 +520,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
smaObj.createdTime = taosGetTimestampMs();
|
smaObj.createdTime = taosGetTimestampMs();
|
||||||
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||||
|
ASSERT(smaObj.uid != 0);
|
||||||
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
|
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
|
||||||
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "td.tsma.rst.tb.%s", pCreate->name);
|
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "td.tsma.rst.tb.%s", pCreate->name);
|
||||||
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
|
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
|
||||||
|
@ -556,10 +557,9 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
streamObj.createTime = taosGetTimestampMs();
|
streamObj.createTime = taosGetTimestampMs();
|
||||||
streamObj.updateTime = streamObj.createTime;
|
streamObj.updateTime = streamObj.createTime;
|
||||||
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||||
streamObj.dbUid = pDb->uid;
|
streamObj.sourceDbUid = pDb->uid;
|
||||||
streamObj.version = 1;
|
streamObj.version = 1;
|
||||||
streamObj.sql = pCreate->sql;
|
streamObj.sql = pCreate->sql;
|
||||||
streamObj.createdBy = STREAM_CREATED_BY__SMA;
|
|
||||||
streamObj.smaId = smaObj.uid;
|
streamObj.smaId = smaObj.uid;
|
||||||
streamObj.watermark = 0;
|
streamObj.watermark = 0;
|
||||||
streamObj.trigger = STREAM_TRIGGER_AT_ONCE;
|
streamObj.trigger = STREAM_TRIGGER_AT_ONCE;
|
||||||
|
|
|
@ -28,7 +28,6 @@
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
#include "mndVgroup.h"
|
#include "mndVgroup.h"
|
||||||
#include "mndSma.h"
|
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
|
||||||
#define STB_VER_NUMBER 1
|
#define STB_VER_NUMBER 1
|
||||||
|
@ -1271,7 +1270,8 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp, int32_t *smaVer) {
|
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp,
|
||||||
|
int32_t *smaVer) {
|
||||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
|
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
|
||||||
|
|
||||||
|
@ -1769,16 +1769,23 @@ int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndExtractTableName(char *tableId, char *name) {
|
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst) {
|
||||||
|
SName name = {0};
|
||||||
|
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
|
|
||||||
|
tNameGetFullDbName(&name, dst);
|
||||||
|
}
|
||||||
|
|
||||||
|
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize) {
|
||||||
int32_t pos = -1;
|
int32_t pos = -1;
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
for (pos = 0; tableId[pos] != 0; ++pos) {
|
for (pos = 0; stbFullName[pos] != 0; ++pos) {
|
||||||
if (tableId[pos] == TS_PATH_DELIMITER[0]) num++;
|
if (stbFullName[pos] == TS_PATH_DELIMITER[0]) num++;
|
||||||
if (num == 2) break;
|
if (num == 2) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (num == 2) {
|
if (num == 2) {
|
||||||
strcpy(name, tableId + pos + 1);
|
tstrncpy(dst, stbFullName + pos + 1, dstSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1808,7 +1815,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
mndExtractTableName(pStb->name, &stbName[VARSTR_HEADER_SIZE]);
|
mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
|
||||||
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
|
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
|
||||||
|
|
||||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
|
|
@ -196,7 +196,8 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
|
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
|
||||||
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
|
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
|
||||||
|
pCreate->targetStbFullName[0] == 0) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -232,6 +233,114 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
|
||||||
|
SNode *pAst = NULL;
|
||||||
|
|
||||||
|
mDebug("stream:%s to create", pCreate->name);
|
||||||
|
memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||||
|
pObj->createTime = taosGetTimestampMs();
|
||||||
|
pObj->updateTime = pObj->createTime;
|
||||||
|
pObj->version = 1;
|
||||||
|
pObj->smaId = 0;
|
||||||
|
|
||||||
|
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
|
||||||
|
pObj->status = 0;
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
pObj->dropPolicy = 0;
|
||||||
|
pObj->trigger = pCreate->triggerType;
|
||||||
|
pObj->triggerParam = pCreate->maxDelay;
|
||||||
|
pObj->watermark = pCreate->watermark;
|
||||||
|
|
||||||
|
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
|
||||||
|
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
|
||||||
|
if (pSourceDb == NULL) {
|
||||||
|
/*ASSERT(0);*/
|
||||||
|
mDebug("stream:%s failed to create, source db %s not exist", pCreate->name, pObj->sourceDb);
|
||||||
|
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pObj->sourceDbUid = pSourceDb->uid;
|
||||||
|
|
||||||
|
memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
|
||||||
|
|
||||||
|
SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
|
||||||
|
if (pTargetDb == NULL) {
|
||||||
|
mDebug("stream:%s failed to create, target db %s not exist", pCreate->name, pObj->targetDb);
|
||||||
|
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
|
||||||
|
|
||||||
|
pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||||
|
pObj->targetDbUid = pTargetDb->uid;
|
||||||
|
|
||||||
|
pObj->sql = pCreate->sql;
|
||||||
|
pObj->ast = pCreate->ast;
|
||||||
|
|
||||||
|
pCreate->sql = NULL;
|
||||||
|
pCreate->ast = NULL;
|
||||||
|
|
||||||
|
// deserialize ast
|
||||||
|
if (nodesStringToNode(pObj->ast, &pAst) < 0) {
|
||||||
|
/*ASSERT(0);*/
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// extract output schema from ast
|
||||||
|
if (qExtractResultSchema(pAst, (int32_t *)&pObj->outputSchema.nCols, &pObj->outputSchema.pSchema) != 0) {
|
||||||
|
/*ASSERT(0);*/
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SQueryPlan *pPlan = NULL;
|
||||||
|
SPlanContext cxt = {
|
||||||
|
.pAstRoot = pAst,
|
||||||
|
.topicQuery = false,
|
||||||
|
.streamQuery = true,
|
||||||
|
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
|
||||||
|
.watermark = pObj->watermark,
|
||||||
|
};
|
||||||
|
|
||||||
|
// using ast and param to build physical plan
|
||||||
|
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
||||||
|
/*ASSERT(0);*/
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// save physcial plan
|
||||||
|
if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) {
|
||||||
|
/*ASSERT(0);*/
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
FAIL:
|
||||||
|
if (pAst != NULL) nodesDestroyNode(pAst);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||||
|
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
||||||
|
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||||
|
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||||
|
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
||||||
|
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||||
|
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) {
|
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) {
|
||||||
SNode *pAst = NULL;
|
SNode *pAst = NULL;
|
||||||
|
|
||||||
|
@ -356,7 +465,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
|
||||||
ASSERT(pTask->nodeId != 0);
|
ASSERT(pTask->nodeId != 0);
|
||||||
|
|
||||||
// vnode
|
// vnode
|
||||||
if (pTask->nodeId > 0) {
|
/*if (pTask->nodeId > 0) {*/
|
||||||
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -373,7 +482,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
/*}*/
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -403,10 +512,9 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
|
||||||
streamObj.updateTime = streamObj.createTime;
|
streamObj.updateTime = streamObj.createTime;
|
||||||
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||||
streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
|
streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
|
||||||
streamObj.dbUid = pDb->uid;
|
streamObj.sourceDbUid = pDb->uid;
|
||||||
streamObj.version = 1;
|
streamObj.version = 1;
|
||||||
streamObj.sql = pCreate->sql;
|
streamObj.sql = pCreate->sql;
|
||||||
streamObj.createdBy = STREAM_CREATED_BY__USER;
|
|
||||||
// TODO
|
// TODO
|
||||||
streamObj.fixedSinkVgId = 0;
|
streamObj.fixedSinkVgId = 0;
|
||||||
streamObj.smaId = 0;
|
streamObj.smaId = 0;
|
||||||
|
@ -486,7 +594,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO check auth
|
// TODO check read auth for source and write auth for target
|
||||||
|
#if 0
|
||||||
pDb = mndAcquireDb(pMnode, createStreamReq.sourceDB);
|
pDb = mndAcquireDb(pMnode, createStreamReq.sourceDB);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||||
|
@ -496,9 +605,52 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb);
|
// build stream obj from request
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
//
|
||||||
|
// schedule stream task for stream obj
|
||||||
|
//
|
||||||
|
SStreamObj streamObj = {0};
|
||||||
|
if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
mndTransSetDbName(pTrans, streamObj.sourceDb);
|
||||||
|
// TODO
|
||||||
|
/*mndTransSetDbName(pTrans, streamObj.targetDb);*/
|
||||||
|
mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
|
||||||
|
|
||||||
|
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) {
|
||||||
|
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) {
|
||||||
|
mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
|
||||||
|
/*code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb);*/
|
||||||
|
/*if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;*/
|
||||||
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
@ -555,10 +707,54 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// drop stream
|
||||||
|
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
||||||
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
DROP_STREAM_OVER:
|
DROP_STREAM_OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
|
void *pIter = NULL;
|
||||||
|
SStreamObj *pStream = NULL;
|
||||||
|
while (1) {
|
||||||
|
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
|
if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
|
||||||
|
if (pStream->sourceDbUid != pStream->targetDbUid) {
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
// TODO drop all task on snode
|
||||||
|
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
if (mndSetDropOffsetStreamLogs(pMnode, pTrans, pStream) < 0) {
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
|
static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
||||||
|
@ -574,7 +770,7 @@ static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfS
|
||||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
if (pStream->dbUid == pDb->uid) {
|
if (pStream->sourceDbUid == pDb->uid) {
|
||||||
numOfStreams++;
|
numOfStreams++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue