fix(stream): create stb in stream
This commit is contained in:
parent
3ae79a77c7
commit
861fde2508
|
@ -81,10 +81,10 @@ int32_t create_stream() {
|
||||||
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
||||||
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
||||||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
||||||
pRes =
|
pRes = taos_query(
|
||||||
taos_query(pConn,
|
pConn,
|
||||||
"create stream stream1 trigger window_close as select _wstartts, min(k), max(k), sum(k) as sum_of_k "
|
"create stream stream1 trigger window_close into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k "
|
||||||
"from tu1 interval(10m)");
|
"from tu1 interval(10m)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -601,7 +601,6 @@ typedef struct {
|
||||||
int32_t triggerParam;
|
int32_t triggerParam;
|
||||||
int64_t waterMark;
|
int64_t waterMark;
|
||||||
char* sql;
|
char* sql;
|
||||||
char* logicalPlan;
|
|
||||||
char* physicalPlan;
|
char* physicalPlan;
|
||||||
SArray* tasks; // SArray<SArray<SStreamTask>>
|
SArray* tasks; // SArray<SArray<SStreamTask>>
|
||||||
SSchemaWrapper outputSchema;
|
SSchemaWrapper outputSchema;
|
||||||
|
|
|
@ -419,7 +419,6 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
|
||||||
streamObj.fixedSinkVgId = smaObj.dstVgId;
|
streamObj.fixedSinkVgId = smaObj.dstVgId;
|
||||||
streamObj.smaId = smaObj.uid;
|
streamObj.smaId = smaObj.uid;
|
||||||
/*streamObj.physicalPlan = "";*/
|
/*streamObj.physicalPlan = "";*/
|
||||||
streamObj.logicalPlan = "not implemented";
|
|
||||||
|
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, &pReq->rpcMsg);
|
||||||
|
|
|
@ -290,7 +290,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SStbObj *mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
|
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
|
||||||
SStbObj *pStb = NULL;
|
SStbObj *pStb = NULL;
|
||||||
SDbObj *pDb = NULL;
|
SDbObj *pDb = NULL;
|
||||||
SUserObj *pUser = NULL;
|
SUserObj *pUser = NULL;
|
||||||
|
@ -301,7 +301,22 @@ static SStbObj *mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStr
|
||||||
createReq.numOfTags = 1; // group id
|
createReq.numOfTags = 1; // group id
|
||||||
createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField));
|
createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField));
|
||||||
// build fields
|
// build fields
|
||||||
|
taosArraySetSize(createReq.pColumns, createReq.numOfColumns);
|
||||||
|
for (int32_t i = 0; i < createReq.numOfColumns; i++) {
|
||||||
|
SField *pField = taosArrayGet(createReq.pColumns, i);
|
||||||
|
tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
|
||||||
|
pField->flags = pStream->outputSchema.pSchema[i].flags;
|
||||||
|
pField->type = pStream->outputSchema.pSchema[i].type;
|
||||||
|
pField->bytes = pStream->outputSchema.pSchema[i].bytes;
|
||||||
|
}
|
||||||
|
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
|
||||||
|
taosArraySetSize(createReq.pTags, 1);
|
||||||
// build tags
|
// build tags
|
||||||
|
SField *pField = taosArrayGet(createReq.pTags, 0);
|
||||||
|
strcpy(pField->name, "group_id");
|
||||||
|
pField->type = TSDB_DATA_TYPE_UBIGINT;
|
||||||
|
pField->flags = 0;
|
||||||
|
pField->bytes = 8;
|
||||||
|
|
||||||
if (mndCheckCreateStbReq(&createReq) != 0) {
|
if (mndCheckCreateStbReq(&createReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
@ -342,18 +357,14 @@ static SStbObj *mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStr
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndBuildStbFromReq(pMnode, pStb, &createReq, pDb) != 0) {
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER;
|
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER;
|
||||||
|
|
||||||
return pStb;
|
return 0;
|
||||||
_OVER:
|
_OVER:
|
||||||
mndReleaseStb(pMnode, pStb);
|
mndReleaseStb(pMnode, pStb);
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
mndReleaseUser(pMnode, pUser);
|
mndReleaseUser(pMnode, pUser);
|
||||||
return NULL;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) {
|
static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) {
|
||||||
|
@ -373,7 +384,6 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
|
||||||
streamObj.fixedSinkVgId = 0;
|
streamObj.fixedSinkVgId = 0;
|
||||||
streamObj.smaId = 0;
|
streamObj.smaId = 0;
|
||||||
/*streamObj.physicalPlan = "";*/
|
/*streamObj.physicalPlan = "";*/
|
||||||
streamObj.logicalPlan = "not implemented";
|
|
||||||
streamObj.trigger = pCreate->triggerType;
|
streamObj.trigger = pCreate->triggerType;
|
||||||
streamObj.waterMark = pCreate->watermark;
|
streamObj.waterMark = pCreate->watermark;
|
||||||
|
|
||||||
|
@ -384,16 +394,14 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
|
||||||
}
|
}
|
||||||
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
|
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
|
||||||
|
|
||||||
#if 0
|
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pCreate->triggerType, pCreate->watermark, pTrans) != 0) {
|
||||||
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->user) < 0) {
|
|
||||||
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pCreate->triggerType, pCreate->watermark, pTrans) != 0) {
|
if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->user) < 0) {
|
||||||
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to create stb for stream since %s", pTrans->id, terrstr());
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,10 @@ void walCloseReadHandle(SWalReadHandle *pRead) {
|
||||||
taosMemoryFree(pRead);
|
taosMemoryFree(pRead);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) { return 0; }
|
int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) {
|
||||||
|
// TODO
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
|
static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
|
Loading…
Reference in New Issue