diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index d9293433ea..3fc65aaff1 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1849,11 +1849,12 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo } ret->length = htonl(ret->length); - taosArrayDestroy(tagArray); + taosArrayDestroy(tagArray); return ret; } -void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) { +void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, + int8_t needCompress) { int32_t* actualLen = (int32_t*)data; data += sizeof(int32_t); @@ -1947,4 +1948,4 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t ASSERT(pStart - pData == dataLen); return pStart; -} \ No newline at end of file +} diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index b5f7362689..48c43b0775 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -27,6 +27,8 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data); +int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 99c61e1479..91ede155bb 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -35,18 +35,19 @@ int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) { return 0; } -#if 1 int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { - SStreamDataBlock* pBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); int8_t status; // enqueue - if (pBlock != NULL) { - pBlock->type = STREAM_DATA_TYPE_SSDATA_BLOCK; - pBlock->sourceVg = pReq->sourceVg; - pBlock->blocks = pReq->data; + if (pData != NULL) { + pData->type = STREAM_DATA_TYPE_SSDATA_BLOCK; + pData->sourceVg = pReq->sourceVg; + // decode + /*pData->blocks = pReq->data;*/ /*pBlock->sourceVer = pReq->sourceVer;*/ - if (streamTaskInput(pTask, (SStreamQueueItem*)pBlock) == 0) { + streamDispatchReqToData(pReq, pData); + if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) { status = TASK_INPUT_STATUS__NORMAL; } else { status = TASK_INPUT_STATUS__FAILED; @@ -68,7 +69,6 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* tmsgSendRsp(pRsp); return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; } -#endif int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { // 1. handle input diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 95c0290058..7139e77407 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -36,6 +36,29 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) { } #endif +int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) { + int32_t blockNum = pReq->blockNum; + SArray* pArray = taosArrayInit(blockNum, sizeof(SSDataBlock)); + if (pArray == NULL) { + return -1; + } + taosArraySetSize(pArray, blockNum); + + ASSERT(pReq->blockNum == taosArrayGetSize(pReq->data)); + ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen)); + + for (int32_t i = 0; i < blockNum; i++) { + int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i); + SRetrieveTableRsp* pRetrieve = taosArrayGetP(pReq->data, i); + SSDataBlock* pDataBlock = taosArrayGet(pArray, i); + blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data); + // TODO: refactor + pDataBlock->info.childId = pReq->sourceChildId; + } + pData->blocks = pArray; + return 0; +} + SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) { SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); if (pDataSubmit == NULL) return NULL; diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 769d672042..e5f953b7cc 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -71,6 +71,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis pRetrieve->compressed = 0; pRetrieve->completed = 1; pRetrieve->numOfRows = htonl(pBlock->info.rows); + pRetrieve->numOfCols = htonl(pBlock->info.numOfCols); int32_t actualLen = 0; blockCompressEncode(pBlock, pRetrieve->data, &actualLen, pBlock->info.numOfCols, false);