Merge pull request #14225 from taosdata/feature/stream
enh(query): window included in serialized retrieve table rsp
This commit is contained in:
commit
e6f88b6286
|
@ -99,8 +99,8 @@ int32_t create_stream() {
|
||||||
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
||||||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
||||||
pRes = taos_query(pConn,
|
pRes = taos_query(pConn,
|
||||||
"create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 "
|
"create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 partition "
|
||||||
"partition by tbname interval(10s) ");
|
"by tbname interval(10s) ");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -851,7 +851,6 @@ typedef struct {
|
||||||
int32_t tSerializeSServerVerRsp(void* buf, int32_t bufLen, SServerVerRsp* pRsp);
|
int32_t tSerializeSServerVerRsp(void* buf, int32_t bufLen, SServerVerRsp* pRsp);
|
||||||
int32_t tDeserializeSServerVerRsp(void* buf, int32_t bufLen, SServerVerRsp* pRsp);
|
int32_t tDeserializeSServerVerRsp(void* buf, int32_t bufLen, SServerVerRsp* pRsp);
|
||||||
|
|
||||||
|
|
||||||
typedef struct SQueryNodeAddr {
|
typedef struct SQueryNodeAddr {
|
||||||
int32_t nodeId; // vgId or qnodeId
|
int32_t nodeId; // vgId or qnodeId
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
|
@ -878,7 +877,6 @@ int32_t tSerializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp);
|
||||||
int32_t tDeserializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp);
|
int32_t tDeserializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp);
|
||||||
void tFreeSDnodeListRsp(SDnodeListRsp* pRsp);
|
void tFreeSDnodeListRsp(SDnodeListRsp* pRsp);
|
||||||
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SArray* pArray; // Array of SUseDbRsp
|
SArray* pArray; // Array of SUseDbRsp
|
||||||
} SUseDbBatchRsp;
|
} SUseDbBatchRsp;
|
||||||
|
@ -1258,7 +1256,6 @@ int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesR
|
||||||
|
|
||||||
void tFreeSShowVariablesRsp(SShowVariablesRsp* pRsp);
|
void tFreeSShowVariablesRsp(SShowVariablesRsp* pRsp);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* sql: show tables like '%a_%'
|
* sql: show tables like '%a_%'
|
||||||
* payload is the query condition, e.g., '%a_%'
|
* payload is the query condition, e.g., '%a_%'
|
||||||
|
@ -1308,6 +1305,8 @@ typedef struct {
|
||||||
int32_t compLen;
|
int32_t compLen;
|
||||||
int32_t numOfRows;
|
int32_t numOfRows;
|
||||||
int32_t numOfCols;
|
int32_t numOfCols;
|
||||||
|
int64_t skey;
|
||||||
|
int64_t ekey;
|
||||||
char data[];
|
char data[];
|
||||||
} SRetrieveTableRsp;
|
} SRetrieveTableRsp;
|
||||||
|
|
||||||
|
|
|
@ -634,7 +634,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
if (dropReq.igNotExists) {
|
if (dropReq.igNotExists) {
|
||||||
mDebug("stream:%s, not exist, ignore not exist is set", dropReq.name);
|
mDebug("stream:%s, not exist, ignore not exist is set", dropReq.name);
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
return -1;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -27,11 +27,14 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
|
||||||
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen));
|
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen));
|
||||||
|
|
||||||
for (int32_t i = 0; i < blockNum; i++) {
|
for (int32_t i = 0; i < blockNum; i++) {
|
||||||
int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
|
/*int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);*/
|
||||||
SRetrieveTableRsp* pRetrieve = taosArrayGetP(pReq->data, i);
|
SRetrieveTableRsp* pRetrieve = taosArrayGetP(pReq->data, i);
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
|
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
|
||||||
blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
|
blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
|
||||||
// TODO: refactor
|
// TODO: refactor
|
||||||
|
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
|
||||||
|
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
|
||||||
|
|
||||||
pDataBlock->info.type = pRetrieve->streamBlockType;
|
pDataBlock->info.type = pRetrieve->streamBlockType;
|
||||||
pDataBlock->info.childId = pReq->upstreamChildId;
|
pDataBlock->info.childId = pReq->upstreamChildId;
|
||||||
}
|
}
|
||||||
|
@ -46,8 +49,14 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
|
||||||
}
|
}
|
||||||
taosArraySetSize(pArray, 1);
|
taosArraySetSize(pArray, 1);
|
||||||
SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
|
SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
|
||||||
SSDataBlock* pBlock = taosArrayGet(pArray, 0);
|
SSDataBlock* pDataBlock = taosArrayGet(pArray, 0);
|
||||||
blockCompressDecode(pBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
|
blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
|
||||||
|
// TODO: refactor
|
||||||
|
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
|
||||||
|
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
|
||||||
|
|
||||||
|
pDataBlock->info.type = pRetrieve->streamBlockType;
|
||||||
|
|
||||||
pData->blocks = pArray;
|
pData->blocks = pArray;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,6 +104,8 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
|
||||||
pRetrieve->streamBlockType = pBlock->info.type;
|
pRetrieve->streamBlockType = pBlock->info.type;
|
||||||
pRetrieve->numOfRows = htonl(pBlock->info.rows);
|
pRetrieve->numOfRows = htonl(pBlock->info.rows);
|
||||||
pRetrieve->numOfCols = htonl(numOfCols);
|
pRetrieve->numOfCols = htonl(numOfCols);
|
||||||
|
pRetrieve->skey = htobe64(pBlock->info.window.skey);
|
||||||
|
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
|
||||||
|
|
||||||
int32_t actualLen = 0;
|
int32_t actualLen = 0;
|
||||||
blockCompressEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
|
blockCompressEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
|
||||||
|
@ -171,6 +173,8 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
|
||||||
pRetrieve->completed = 1;
|
pRetrieve->completed = 1;
|
||||||
pRetrieve->streamBlockType = pBlock->info.type;
|
pRetrieve->streamBlockType = pBlock->info.type;
|
||||||
pRetrieve->numOfRows = htonl(pBlock->info.rows);
|
pRetrieve->numOfRows = htonl(pBlock->info.rows);
|
||||||
|
pRetrieve->skey = htobe64(pBlock->info.window.skey);
|
||||||
|
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
|
||||||
|
|
||||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
||||||
pRetrieve->numOfCols = htonl(numOfCols);
|
pRetrieve->numOfCols = htonl(numOfCols);
|
||||||
|
|
Loading…
Reference in New Issue