fix(query): fix bugs caused by refactor.

This commit is contained in:
Haojun Liao 2024-05-19 13:32:30 +08:00
parent 217fb7551c
commit 1e5098f676
4 changed files with 19 additions and 11 deletions

View File

@ -2129,6 +2129,14 @@ typedef struct {
char data[];
} SRetrieveTableRsp;
#define PAYLOAD_PREFIX_LEN ((sizeof(int32_t)) << 1)
#define SET_PAYLOAD_LEN(_p, _compLen, _fullLen) \
do { \
((int32_t*)(_p))[0] = (_compLen); \
((int32_t*)(_p))[1] = (_fullLen); \
} while (0);
typedef struct {
int64_t version;
int64_t numOfRows;

View File

@ -20,14 +20,6 @@
#include "plannodes.h"
#include "tmsg.h"
#define PAYLOAD_PREFIX_LEN ((sizeof(int32_t)) << 1)
#define SET_PAYLOAD_LEN(_p, _compLen, _fullLen) \
do { \
((int32_t*)(_p))[0] = (_compLen); \
((int32_t*)(_p))[1] = (_fullLen); \
} while (0);
typedef struct SExplainCtx SExplainCtx;
int32_t qExecCommand(int64_t* pConnId, bool sysInfoUser, SNode *pStmt, SRetrieveTableRsp **pRsp, int8_t biMode);

View File

@ -1941,7 +1941,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
pBlock->info.rows = rowNum;
int32_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + sizeof(int32_t)*2;
int32_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize);
if (NULL == rsp) {

View File

@ -129,7 +129,8 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req){
SRetrieveTableRsp* pRetrieve = NULL;
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
pRetrieve = taosMemoryCalloc(1, dataStrLen);
if (pRetrieve == NULL) return TSDB_CODE_OUT_OF_MEMORY;
@ -146,7 +147,14 @@ static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBl
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data+ PAYLOAD_PREFIX_LEN, numOfCols);
SET_PAYLOAD_LEN(pRetrieve->data, actualLen, actualLen);
int32_t payloadLen = actualLen + PAYLOAD_PREFIX_LEN;
pRetrieve->payloadLen = htonl(payloadLen);
pRetrieve->compLen = htonl(payloadLen);
pRetrieve->compressed = 0;
req->streamId = pTask->id.streamId;
req->srcNodeId = pTask->info.nodeId;