fix
This commit is contained in:
parent
9063e480f1
commit
375814c0ca
|
@ -81,9 +81,11 @@ int32_t create_stream() {
|
||||||
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
||||||
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
||||||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
||||||
pRes = taos_query(pConn, "create stream stream1 as select min(k), max(k), sum(k) as sum_of_k from st1");
|
pRes = taos_query(
|
||||||
|
pConn,
|
||||||
|
"create stream stream1 trigger window_close as select min(k), max(k), sum(k) as sum_of_k from tu1 interval(10m)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create stream out1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
|
@ -667,7 +667,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
if (code != 0) goto FAIL;
|
if (code != 0) goto FAIL;
|
||||||
|
|
||||||
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
|
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
|
||||||
tscDebug("not ready, retry");
|
tscDebug("consumer not ready, retry");
|
||||||
taosMsleep(500);
|
taosMsleep(500);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -418,6 +418,9 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
|
||||||
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
|
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
|
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1;
|
if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pObj->triggerParam) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pObj->waterMark) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
|
||||||
|
@ -464,6 +467,9 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
|
||||||
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pObj->triggerParam) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pObj->waterMark) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
|
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
|
||||||
|
|
|
@ -308,6 +308,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
|
||||||
streamObj.smaId = 0;
|
streamObj.smaId = 0;
|
||||||
/*streamObj.physicalPlan = "";*/
|
/*streamObj.physicalPlan = "";*/
|
||||||
streamObj.logicalPlan = "not implemented";
|
streamObj.logicalPlan = "not implemented";
|
||||||
|
streamObj.trigger = pCreate->triggerType;
|
||||||
|
streamObj.waterMark = pCreate->watermark;
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue