diff --git a/example/src/tstream.c b/example/src/tstream.c index f42b5253db..766368475e 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -81,9 +81,11 @@ int32_t create_stream() { /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ - pRes = taos_query(pConn, "create stream stream1 as select min(k), max(k), sum(k) as sum_of_k from st1"); + pRes = taos_query( + pConn, + "create stream stream1 trigger window_close as select min(k), max(k), sum(k) as sum_of_k from tu1 interval(10m)"); if (taos_errno(pRes) != 0) { - printf("failed to create stream out1, reason:%s\n", taos_errstr(pRes)); + printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 76bd6df9c2..93a20c3d45 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -667,7 +667,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { if (code != 0) goto FAIL; while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { - tscDebug("not ready, retry"); + tscDebug("consumer not ready, retry"); taosMsleep(500); } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 54f411d71f..03ac74aa62 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -418,6 +418,9 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1; + if (tEncodeI32(pEncoder, pObj->triggerParam) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->waterMark) < 0) return -1; if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1; if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; @@ -464,6 +467,9 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1; + if (tDecodeI32(pDecoder, &pObj->triggerParam) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->waterMark) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 935b01d3f5..5c6e2ce771 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -308,6 +308,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe streamObj.smaId = 0; /*streamObj.physicalPlan = "";*/ streamObj.logicalPlan = "not implemented"; + streamObj.trigger = pCreate->triggerType; + streamObj.waterMark = pCreate->watermark; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); if (pTrans == NULL) {