fix(query): fix error in refactor.
This commit is contained in:
parent
0fe2686b63
commit
f74fad7e60
|
@ -33,7 +33,6 @@ SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pRe
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen)));
|
ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen)));
|
||||||
|
|
||||||
for (int32_t i = 0; i < blockNum; i++) {
|
for (int32_t i = 0; i < blockNum; i++) {
|
||||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i);
|
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i);
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
|
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
|
||||||
|
|
|
@ -130,9 +130,9 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
|
||||||
static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req){
|
static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req){
|
||||||
SRetrieveTableRsp* pRetrieve = NULL;
|
SRetrieveTableRsp* pRetrieve = NULL;
|
||||||
|
|
||||||
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
|
int32_t len = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
|
||||||
|
|
||||||
pRetrieve = taosMemoryCalloc(1, dataStrLen);
|
pRetrieve = taosMemoryCalloc(1, len);
|
||||||
if (pRetrieve == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
if (pRetrieve == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
|
@ -148,7 +148,6 @@ static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBl
|
||||||
pRetrieve->version = htobe64(pBlock->info.version);
|
pRetrieve->version = htobe64(pBlock->info.version);
|
||||||
|
|
||||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data+ PAYLOAD_PREFIX_LEN, numOfCols);
|
int32_t actualLen = blockEncode(pBlock, pRetrieve->data+ PAYLOAD_PREFIX_LEN, numOfCols);
|
||||||
|
|
||||||
SET_PAYLOAD_LEN(pRetrieve->data, actualLen, actualLen);
|
SET_PAYLOAD_LEN(pRetrieve->data, actualLen, actualLen);
|
||||||
|
|
||||||
int32_t payloadLen = actualLen + PAYLOAD_PREFIX_LEN;
|
int32_t payloadLen = actualLen + PAYLOAD_PREFIX_LEN;
|
||||||
|
@ -160,7 +159,7 @@ static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBl
|
||||||
req->srcNodeId = pTask->info.nodeId;
|
req->srcNodeId = pTask->info.nodeId;
|
||||||
req->srcTaskId = pTask->id.taskId;
|
req->srcTaskId = pTask->id.taskId;
|
||||||
req->pRetrieve = pRetrieve;
|
req->pRetrieve = pRetrieve;
|
||||||
req->retrieveLen = dataStrLen;
|
req->retrieveLen = len;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -700,7 +699,9 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
|
||||||
pRetrieve->payloadLen = htonl(payloadLen);
|
pRetrieve->payloadLen = htonl(payloadLen);
|
||||||
pRetrieve->compLen = htonl(payloadLen);
|
pRetrieve->compLen = htonl(payloadLen);
|
||||||
|
|
||||||
taosArrayPush(pReq->dataLen, &actualLen);
|
payloadLen += sizeof(SRetrieveTableRsp);
|
||||||
|
|
||||||
|
taosArrayPush(pReq->dataLen, &payloadLen);
|
||||||
taosArrayPush(pReq->data, &buf);
|
taosArrayPush(pReq->data, &buf);
|
||||||
|
|
||||||
pReq->totalLen += dataStrLen;
|
pReq->totalLen += dataStrLen;
|
||||||
|
|
Loading…
Reference in New Issue