fix(query): fix bugs caused by refactor.

This commit is contained in:
Haojun Liao 2024-05-19 13:40:09 +08:00
parent 1e5098f676
commit 82c10fd709
2 changed files with 9 additions and 4 deletions

View File

@ -98,7 +98,8 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
taosArrayPush(pArray, &(SSDataBlock){0});
SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
SSDataBlock* pDataBlock = taosArrayGet(pArray, 0);
blockDecode(pDataBlock, pRetrieve->data);
blockDecode(pDataBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN);
// TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);

View File

@ -671,7 +671,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
}
int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
ASSERT(dataStrLen > 0);
void* buf = taosMemoryCalloc(1, dataStrLen);
@ -694,8 +694,12 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
pRetrieve->numOfCols = htonl(numOfCols);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
SET_PAYLOAD_LEN(pRetrieve->data, actualLen, actualLen);
int32_t payloadLen = actualLen + PAYLOAD_PREFIX_LEN;
pRetrieve->payloadLen = htonl(payloadLen);
pRetrieve->compLen = htonl(payloadLen);
taosArrayPush(pReq->dataLen, &actualLen);
taosArrayPush(pReq->data, &buf);