stream support fix ep sink
This commit is contained in:
parent
98d8327ccf
commit
16f99d9916
|
@ -1,78 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_CONSUMER_H_
|
|
||||||
#define _TD_CONSUMER_H_
|
|
||||||
|
|
||||||
#include "tlist.h"
|
|
||||||
#include "tarray.h"
|
|
||||||
#include "hash.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
//consumer handle
|
|
||||||
struct tmq_consumer_t;
|
|
||||||
typedef struct tmq_consumer_t tmq_consumer_t;
|
|
||||||
|
|
||||||
//consumer config
|
|
||||||
struct tmq_consumer_config_t;
|
|
||||||
typedef struct tmq_consumer_config_t tmq_consumer_config_t;
|
|
||||||
|
|
||||||
//response err
|
|
||||||
struct tmq_resp_err_t;
|
|
||||||
typedef struct tmq_resp_err_t tmq_resp_err_t;
|
|
||||||
|
|
||||||
struct tmq_message_t;
|
|
||||||
typedef struct tmq_message_t tmq_message_t;
|
|
||||||
|
|
||||||
struct tmq_col_batch_t;
|
|
||||||
typedef struct tmq_col_batch_t tmq_col_batch_t;
|
|
||||||
|
|
||||||
//get content of message
|
|
||||||
tmq_col_batch_t* tmq_get_msg_col_by_idx(tmq_message_t*, int32_t col_id);
|
|
||||||
tmq_col_batch_t* tmq_get_msg_col_by_name(tmq_message_t*, const char*);
|
|
||||||
|
|
||||||
//consumer config
|
|
||||||
int32_t tmq_conf_set(tmq_consumer_config_t* , const char* config_key, const char* config_value, char* errstr, int32_t errstr_cap);
|
|
||||||
|
|
||||||
//consumer initialization
|
|
||||||
//resouces are supposed to be free by users by calling tmq_consumer_destroy
|
|
||||||
tmq_consumer_t* tmq_consumer_new(tmq_consumer_config_t* , char* errstr, int32_t errstr_cap);
|
|
||||||
|
|
||||||
//subscribe
|
|
||||||
tmq_resp_err_t tmq_subscribe(tmq_consumer_t*, const SList*);
|
|
||||||
tmq_resp_err_t tmq_unsubscribe(tmq_consumer_t*);
|
|
||||||
|
|
||||||
//consume
|
|
||||||
//resouces are supposed to be free by users by calling tmq_message_destroy
|
|
||||||
tmq_message_t* tmq_consume_poll(tmq_consumer_t*, int64_t blocking_time);
|
|
||||||
|
|
||||||
//destroy message and free memory
|
|
||||||
void tmq_message_destroy(tmq_message_t*);
|
|
||||||
|
|
||||||
//close consumer
|
|
||||||
int32_t tmq_consumer_close(tmq_consumer_t*);
|
|
||||||
|
|
||||||
//destroy consumer
|
|
||||||
void tmq_consumer_destroy(tmq_message_t*);
|
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /*_TD_CONSUMER_H_*/
|
|
|
@ -1,27 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_STREAM_H_
|
|
||||||
#define _TD_STREAM_H_
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /*_TD_STREAM_H_*/
|
|
|
@ -30,6 +30,11 @@ enum {
|
||||||
STREAM_TASK_STATUS__STOP,
|
STREAM_TASK_STATUS__STOP,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum {
|
||||||
|
STREAM_CREATED_BY__USER = 1,
|
||||||
|
STREAM_CREATED_BY__SMA,
|
||||||
|
};
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
// pipe -> fetch/pipe queue
|
// pipe -> fetch/pipe queue
|
||||||
// merge -> merge queue
|
// merge -> merge queue
|
||||||
|
|
|
@ -731,10 +731,10 @@ typedef struct {
|
||||||
int32_t vgNum;
|
int32_t vgNum;
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
int8_t status;
|
int8_t status;
|
||||||
int8_t sourceType;
|
|
||||||
int8_t sinkType;
|
|
||||||
// int32_t sqlLen;
|
// int32_t sqlLen;
|
||||||
int32_t sinkVgId; // 0 for automatic
|
int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
|
||||||
|
int32_t fixedSinkVgId; // 0 for shuffle
|
||||||
|
int64_t smaId; // 0 for unused
|
||||||
char* sql;
|
char* sql;
|
||||||
char* logicalPlan;
|
char* logicalPlan;
|
||||||
char* physicalPlan;
|
char* physicalPlan;
|
||||||
|
|
|
@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode);
|
||||||
|
|
||||||
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub);
|
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub);
|
||||||
|
|
||||||
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId);
|
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
|
||||||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
||||||
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
||||||
|
|
||||||
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId);
|
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,8 +26,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
|
||||||
if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
|
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
|
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;
|
/*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/
|
||||||
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
|
||||||
// TODO encode tasks
|
// TODO encode tasks
|
||||||
if (pObj->tasks) {
|
if (pObj->tasks) {
|
||||||
|
@ -69,8 +72,11 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
|
||||||
if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
|
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;
|
/*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
|
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
|
||||||
pObj->tasks = NULL;
|
pObj->tasks = NULL;
|
||||||
int32_t sz;
|
int32_t sz;
|
||||||
|
|
|
@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
|
||||||
return pVgroup;
|
return pVgroup;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) {
|
int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
SSdb* pSdb = pMnode->pSdb;
|
SSdb* pSdb = pMnode->pSdb;
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
|
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
|
||||||
|
@ -151,9 +151,9 @@ int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream,
|
||||||
pTask->execType = TASK_EXEC__NONE;
|
pTask->execType = TASK_EXEC__NONE;
|
||||||
|
|
||||||
// sink
|
// sink
|
||||||
if (smaId != -1) {
|
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
|
||||||
pTask->sinkType = TASK_SINK__SMA;
|
pTask->sinkType = TASK_SINK__SMA;
|
||||||
pTask->smaSink.smaId = smaId;
|
pTask->smaSink.smaId = pStream->smaId;
|
||||||
} else {
|
} else {
|
||||||
pTask->sinkType = TASK_SINK__TABLE;
|
pTask->sinkType = TASK_SINK__TABLE;
|
||||||
}
|
}
|
||||||
|
@ -166,7 +166,45 @@ int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) {
|
int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
|
ASSERT(pStream->fixedSinkVgId != 0);
|
||||||
|
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
|
||||||
|
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
||||||
|
if (pTask == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taosArrayPush(tasks, &pTask);
|
||||||
|
|
||||||
|
pTask->nodeId = pStream->fixedSinkVgId;
|
||||||
|
SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
|
||||||
|
if (pVgroup == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
|
// source
|
||||||
|
pTask->sourceType = TASK_SOURCE__MERGE;
|
||||||
|
|
||||||
|
// exec
|
||||||
|
pTask->execType = TASK_EXEC__NONE;
|
||||||
|
|
||||||
|
// sink
|
||||||
|
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
|
||||||
|
pTask->sinkType = TASK_SINK__SMA;
|
||||||
|
pTask->smaSink.smaId = pStream->smaId;
|
||||||
|
} else {
|
||||||
|
pTask->sinkType = TASK_SINK__TABLE;
|
||||||
|
}
|
||||||
|
//
|
||||||
|
// dispatch
|
||||||
|
pTask->dispatchType = TASK_DISPATCH__NONE;
|
||||||
|
|
||||||
|
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
SSdb* pSdb = pMnode->pSdb;
|
SSdb* pSdb = pMnode->pSdb;
|
||||||
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
|
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
|
||||||
if (pPlan == NULL) {
|
if (pPlan == NULL) {
|
||||||
|
@ -185,7 +223,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i
|
||||||
taosArrayPush(pStream->tasks, &taskOneLevel);
|
taosArrayPush(pStream->tasks, &taskOneLevel);
|
||||||
// add extra sink
|
// add extra sink
|
||||||
hasExtraSink = true;
|
hasExtraSink = true;
|
||||||
mndAddSinkToStream(pMnode, pTrans, pStream, smaId);
|
if (pStream->fixedSinkVgId == 0) {
|
||||||
|
mndAddShuffledSinkToStream(pMnode, pTrans, pStream);
|
||||||
|
} else {
|
||||||
|
mndAddFixedSinkToStream(pMnode, pTrans, pStream);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t level = 0; level < totLevel; level++) {
|
for (int32_t level = 0; level < totLevel; level++) {
|
||||||
|
@ -221,12 +263,14 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i
|
||||||
pTask->sinkType = TASK_SINK__SHOW;
|
pTask->sinkType = TASK_SINK__SHOW;
|
||||||
pTask->showSink.reserved = 0;
|
pTask->showSink.reserved = 0;
|
||||||
if (!hasExtraSink) {
|
if (!hasExtraSink) {
|
||||||
if (smaId != -1) {
|
#if 1
|
||||||
|
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
|
||||||
pTask->sinkType = TASK_SINK__SMA;
|
pTask->sinkType = TASK_SINK__SMA;
|
||||||
pTask->smaSink.smaId = smaId;
|
pTask->smaSink.smaId = pStream->smaId;
|
||||||
} else {
|
} else {
|
||||||
pTask->sinkType = TASK_SINK__TABLE;
|
pTask->sinkType = TASK_SINK__TABLE;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pTask->sinkType = TASK_SINK__NONE;
|
pTask->sinkType = TASK_SINK__NONE;
|
||||||
|
@ -286,35 +330,47 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i
|
||||||
|
|
||||||
if (hasExtraSink) {
|
if (hasExtraSink) {
|
||||||
// add dispatcher
|
// add dispatcher
|
||||||
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
if (pStream->fixedSinkVgId == 0) {
|
||||||
|
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
||||||
|
|
||||||
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
|
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
|
||||||
SDbObj* pDb = mndAcquireDb(pMnode, pStream->db);
|
SDbObj* pDb = mndAcquireDb(pMnode, pStream->db);
|
||||||
ASSERT(pDb);
|
ASSERT(pDb);
|
||||||
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
||||||
|
sdbRelease(pSdb, pDb);
|
||||||
|
qDestroyQueryPlan(pPlan);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
sdbRelease(pSdb, pDb);
|
sdbRelease(pSdb, pDb);
|
||||||
qDestroyQueryPlan(pPlan);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
sdbRelease(pSdb, pDb);
|
|
||||||
|
|
||||||
// put taskId to useDbRsp
|
// put taskId to useDbRsp
|
||||||
// TODO: optimize
|
// TODO: optimize
|
||||||
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
int32_t sz = taosArrayGetSize(pVgs);
|
int32_t sz = taosArrayGetSize(pVgs);
|
||||||
SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
|
SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
|
||||||
int32_t sinkLvSize = taosArrayGetSize(sinkLv);
|
int32_t sinkLvSize = taosArrayGetSize(sinkLv);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
|
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
|
||||||
for (int32_t j = 0; j < sinkLvSize; j++) {
|
for (int32_t j = 0; j < sinkLvSize; j++) {
|
||||||
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
|
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
|
||||||
/*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/
|
/*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/
|
||||||
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
|
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
|
||||||
pVgInfo->taskId = pLastLevelTask->taskId;
|
pVgInfo->taskId = pLastLevelTask->taskId;
|
||||||
/*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/
|
/*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
pTask->dispatchType = TASK_DISPATCH__FIXED;
|
||||||
|
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
|
||||||
|
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
|
||||||
|
// one sink only
|
||||||
|
ASSERT(taosArrayGetSize(pArray) == 1);
|
||||||
|
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
|
||||||
|
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
|
||||||
|
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
|
||||||
|
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -415,6 +415,10 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
|
||||||
streamObj.dbUid = pDb->uid;
|
streamObj.dbUid = pDb->uid;
|
||||||
streamObj.version = 1;
|
streamObj.version = 1;
|
||||||
streamObj.sql = pCreate->sql;
|
streamObj.sql = pCreate->sql;
|
||||||
|
streamObj.createdBy = STREAM_CREATED_BY__SMA;
|
||||||
|
// TODO
|
||||||
|
streamObj.fixedSinkVgId = 0;
|
||||||
|
streamObj.smaId = smaObj.uid;
|
||||||
/*streamObj.physicalPlan = "";*/
|
/*streamObj.physicalPlan = "";*/
|
||||||
streamObj.logicalPlan = "not implemented";
|
streamObj.logicalPlan = "not implemented";
|
||||||
|
|
||||||
|
@ -428,7 +432,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
|
||||||
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||||
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||||
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
|
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
|
||||||
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, smaObj.uid) != 0) goto _OVER;
|
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
|
@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId) {
|
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) {
|
||||||
SNode *pAst = NULL;
|
SNode *pAst = NULL;
|
||||||
|
|
||||||
if (nodesStringToNode(ast, &pAst) < 0) {
|
if (nodesStringToNode(ast, &pAst) < 0) {
|
||||||
|
@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndScheduleStream(pMnode, pTrans, pStream, smaId) < 0) {
|
if (mndScheduleStream(pMnode, pTrans, pStream) < 0) {
|
||||||
mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr());
|
mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -300,6 +300,10 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
|
||||||
streamObj.dbUid = pDb->uid;
|
streamObj.dbUid = pDb->uid;
|
||||||
streamObj.version = 1;
|
streamObj.version = 1;
|
||||||
streamObj.sql = pCreate->sql;
|
streamObj.sql = pCreate->sql;
|
||||||
|
streamObj.createdBy = STREAM_CREATED_BY__USER;
|
||||||
|
// TODO
|
||||||
|
streamObj.fixedSinkVgId = 0;
|
||||||
|
streamObj.smaId = 0;
|
||||||
/*streamObj.physicalPlan = "";*/
|
/*streamObj.physicalPlan = "";*/
|
||||||
streamObj.logicalPlan = "not implemented";
|
streamObj.logicalPlan = "not implemented";
|
||||||
|
|
||||||
|
@ -310,7 +314,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
|
||||||
}
|
}
|
||||||
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
|
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
|
||||||
|
|
||||||
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, -1) != 0) {
|
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) {
|
||||||
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
Loading…
Reference in New Issue