fix(stream): support none type in submit msg
This commit is contained in:
parent
f6b38f371e
commit
0964b07229
|
@ -25,7 +25,7 @@ int32_t init_env() {
|
|||
return -1;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
@ -68,6 +68,14 @@ int32_t init_env() {
|
|||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table if not exists tu3 using st1 tags(3)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create child table tu3, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -90,10 +98,9 @@ int32_t create_stream() {
|
|||
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
||||
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
||||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
||||
pRes = taos_query(
|
||||
pConn,
|
||||
"create stream stream1 trigger window_close watermark 10s into outstb as select _wstartts, sum(k) from st1 "
|
||||
"interval(10s) ");
|
||||
pRes = taos_query(pConn,
|
||||
"create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 "
|
||||
"partition by tbname interval(10s) ");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
|
|
@ -243,7 +243,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
|
|||
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
|
||||
break;
|
||||
}
|
||||
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {
|
||||
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType != TD_VTYPE_NORM) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,7 +63,6 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
|||
}
|
||||
|
||||
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
|
||||
//
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1;
|
||||
|
@ -84,7 +83,7 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
|
|||
if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1;
|
||||
uint64_t len = 0;
|
||||
if (tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len) < 0) return -1;
|
||||
pReq->retrieveLen = len;
|
||||
pReq->retrieveLen = (int32_t)len;
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue