fix(query): fix bugs caused by refactor.

This commit is contained in:
Haojun Liao 2024-05-19 14:55:51 +08:00
parent 7a90e68667
commit 0fe2686b63
2 changed files with 20 additions and 4 deletions

View File

@ -343,13 +343,13 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
// Got data from sink
QW_TASK_DLOG("there are data in sink, dataLength:%" PRId64 "", len);
*dataLen += len + sizeof(int32_t) * 2;
*pRawDataLen += rawLen + sizeof(int32_t) * 2;
*dataLen += len + PAYLOAD_PREFIX_LEN;
*pRawDataLen += rawLen + PAYLOAD_PREFIX_LEN;
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &pRsp));
// set the serialize start position
output.pData = pRsp->data + *dataLen - (len + sizeof(int32_t) * 2);
output.pData = pRsp->data + *dataLen - (len + PAYLOAD_PREFIX_LEN);
((int32_t*) output.pData)[0] = len;
((int32_t*) output.pData)[1] = rawLen;

View File

@ -37,7 +37,23 @@ SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pRe
for (int32_t i = 0; i < blockNum; i++) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i);
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
blockDecode(pDataBlock, pRetrieve->data);
int32_t compLen = *(int32_t*)pRetrieve->data;
int32_t fullLen = *(int32_t*)(pRetrieve->data + sizeof(int32_t));
char* pInput = pRetrieve->data + PAYLOAD_PREFIX_LEN;
if (pRetrieve->compressed && compLen < fullLen) {
char* p = taosMemoryMalloc(fullLen);
int32_t len = tsDecompressString(pInput, compLen, 1, p, fullLen, ONE_STAGE_COMP, NULL, 0);
ASSERT(len == fullLen);
pInput = p;
}
blockDecode(pDataBlock, pInput);
if (pRetrieve->compressed && compLen < fullLen) {
taosMemoryFree(pInput);
}
// TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);