diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index bcda85e7a7..309234179c 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -98,7 +98,8 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock taosArrayPush(pArray, &(SSDataBlock){0}); SRetrieveTableRsp* pRetrieve = pReq->pRetrieve; SSDataBlock* pDataBlock = taosArrayGet(pArray, 0); - blockDecode(pDataBlock, pRetrieve->data); + + blockDecode(pDataBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN); // TODO: refactor pDataBlock->info.window.skey = be64toh(pRetrieve->skey); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index c9779919a4..28bd05d0c2 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -671,7 +671,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { } int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { - int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); + int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN; ASSERT(dataStrLen > 0); void* buf = taosMemoryCalloc(1, dataStrLen); @@ -694,8 +694,12 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch pRetrieve->numOfCols = htonl(numOfCols); int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); - actualLen += sizeof(SRetrieveTableRsp); - ASSERT(actualLen <= dataStrLen); + SET_PAYLOAD_LEN(pRetrieve->data, actualLen, actualLen); + + int32_t payloadLen = actualLen + PAYLOAD_PREFIX_LEN; + pRetrieve->payloadLen = htonl(payloadLen); + pRetrieve->compLen = htonl(payloadLen); + taosArrayPush(pReq->dataLen, &actualLen); taosArrayPush(pReq->data, &buf);