From 0e4748771acd90aceeee5141b094bbce64e7d1ed Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 5 Aug 2022 14:10:45 +0800 Subject: [PATCH] enh(query): support combine multi datablock to pass to upstream operators --- include/common/tdatablock.h | 2 +- source/client/src/clientImpl.c | 11 ++ source/common/src/tdatablock.c | 34 +++++- source/dnode/vnode/src/tsdb/tsdbRead.c | 7 +- source/libs/executor/inc/executorimpl.h | 9 +- source/libs/executor/src/dataDispatcher.c | 8 +- source/libs/executor/src/executorimpl.c | 142 +++++++++++++--------- source/libs/executor/src/scanoperator.c | 4 +- source/libs/stream/src/streamData.c | 5 +- 9 files changed, 142 insertions(+), 80 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 7839859e8b..3d605a144b 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -239,7 +239,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index); void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress); -const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData); +const char* blockDecode(SSDataBlock* pBlock, const char* pData); void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag); void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0c4dc1705c..8357e2d627 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1765,6 +1765,17 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 int32_t dataLen = *(int32_t*)p; p += sizeof(int32_t); + int32_t rows = *(int32_t*)p; + p += sizeof(int32_t); + + int32_t cols = *(int32_t*)p; + p += sizeof(int32_t); + + ASSERT(rows == numOfRows && cols == numOfCols); + + int32_t hasColumnSeg = *(int32_t*)p; + p += sizeof(int32_t); + uint64_t groupId = *(uint64_t*)p; p += sizeof(uint64_t); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 4ce60efaea..bcbdf8d786 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -676,9 +676,9 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) { * @return */ size_t blockDataGetSerialMetaSize(uint32_t numOfCols) { - // | total rows/total length | block group id | column schema | each column length | - return sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)) + - numOfCols * sizeof(int32_t); + // | total length | total rows | total columns | has column seg| block group id | column schema | each column length | + return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(uint64_t) + + numOfCols * (sizeof(int16_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t); } double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { @@ -2077,6 +2077,18 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_ int32_t* actualLen = (int32_t*)data; data += sizeof(int32_t); + int32_t* rows = (int32_t*)data; + *rows = pBlock->info.rows; + data += sizeof(int32_t); + + int32_t* cols = (int32_t*)data; + *cols = numOfCols; + data += sizeof(int32_t); + + int32_t* hasColumnSegment = (int32_t*)data; + *hasColumnSegment = 1; + data += sizeof(int32_t); + uint64_t* groupId = (uint64_t*)data; data += sizeof(uint64_t); @@ -2130,12 +2142,26 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_ *groupId = pBlock->info.groupId; } -const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) { +const char* blockDecode(SSDataBlock* pBlock, const char* pData) { const char* pStart = pData; + // total length sizeof(int32_t) int32_t dataLen = *(int32_t*)pStart; pStart += sizeof(int32_t); + // total rows sizeof(int32_t) + int32_t numOfRows = *(int32_t*)pStart; + pStart += sizeof(int32_t); + + // total columns sizeof(int32_t) + int32_t numOfCols = *(int32_t*)pStart; + pStart += sizeof(int32_t); + + // has column info segment + int32_t hasColumnInfo = *(int32_t*)pStart; + pStart += sizeof(int32_t); + + // group id sizeof(uint64_t) pBlock->info.groupId = *(uint64_t*)pStart; pStart += sizeof(uint64_t); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 7a867d85c9..2a8e234ac0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2714,7 +2714,6 @@ void tsdbReaderClose(STsdbReader* pReader) { } SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; - tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap); taosMemoryFreeClear(pSupInfo->plist); @@ -2742,10 +2741,8 @@ void tsdbReaderClose(STsdbReader* pReader) { SIOCostSummary* pCost = &pReader->cost; tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 - " SMA-time:%.2f ms, " - "fileBlocks:%" PRId64 - ", fileBlocks-time:%.2f ms, build in-memory-block-time:%.2f ms, STableBlockScanInfo " - "size:%.2f Kb %s", + " SMA-time:%.2f ms, fileBlocks:%" PRId64 ", fileBlocks-time:%.2f ms, " + "build in-memory-block-time:%.2f ms, STableBlockScanInfo size:%.2f Kb %s", pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime, pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 2e0ed802e5..62b2cc0fa6 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -266,7 +266,11 @@ typedef struct SExchangeInfo { SArray* pSourceDataInfo; tsem_t ready; void* pTransporter; - SSDataBlock* pResult; + // SArray, result block list, used to keep the multi-block that + // passed by downstream operator + SArray* pResultBlockList; + int32_t rspBlockIndex; // indicate the return block index in pResultBlockList + SSDataBlock* pDummyBlock; // dummy block, not keep data bool seqLoadData; // sequential load data or not, false by default int32_t current; SLoadRemoteDataInfo loadInfo; @@ -855,8 +859,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWin int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, - int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, - SArray* pColList); + int32_t compLen, int32_t numOfOutput, uint64_t* total, SArray* pColList, char** pNextStart); STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index f0fb5852a0..3a37b5e760 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -69,10 +69,10 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) { // clang-format off // data format: -// +----------------+--------------+-----------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+ -// |SDataCacheEntry | total length | group id | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data | .... | | (4 bytes) |(8 bytes) -// | |sizeof(int32) |sizeof(uint64_t) |(sizeof(int16_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | | -// +----------------+--------------+-----------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+ +// +----------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+ +// |SDataCacheEntry | total length | numOfRows | group id | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data | .... | | (4 bytes) |(8 bytes) +// | |sizeof(int32) | sizeof(int32)| sizeof(uint64_t) |(sizeof(int16_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | | +// +----------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+ // The length of bitmap is decided by number of rows of this data block, and the length of each column data is // recorded in the first segment, next to the struct header // clang-format on diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 23dbd4d992..8dfda896cc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1956,6 +1956,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { pRsp->compLen = htonl(pRsp->compLen); pRsp->numOfCols = htonl(pRsp->numOfCols); pRsp->useconds = htobe64(pRsp->useconds); + pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); ASSERT(pRsp != NULL); qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows); @@ -2043,12 +2044,10 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf } int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, - int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, - SArray* pColList) { + int32_t compLen, int32_t numOfOutput, uint64_t* total, SArray* pColList, char** pNextStart) { if (pColList == NULL) { // data from other sources blockDataCleanup(pRes); - // blockDataEnsureCapacity(pRes, numOfRows); - blockDecode(pRes, numOfOutput, numOfRows, pData); + *pNextStart = (char*) blockDecode(pRes, pData); } else { // extract data according to pColList ASSERT(numOfOutput == taosArrayGetSize(pColList)); char* pStart = pData; @@ -2072,7 +2071,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo blockDataAppendColInfo(pBlock, &idata); } - blockDecode(pBlock, numOfCols, numOfRows, pStart); + blockDecode(pBlock, pStart); blockDataEnsureCapacity(pRes, numOfRows); // data from mnode @@ -2084,8 +2083,6 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo // todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator. blockDataUpdateTsWindow(pRes, 0); - int64_t el = taosGetTimestampUs() - startTs; - pLoadInfo->totalRows += numOfRows; pLoadInfo->totalSize += compLen; @@ -2093,7 +2090,6 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo *total += numOfRows; } - pLoadInfo->totalElapsed += el; return TSDB_CODE_SUCCESS; } @@ -2115,8 +2111,8 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) { return NULL; } -static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo, - SExecTaskInfo* pTaskInfo) { +static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo, + SExecTaskInfo* pTaskInfo) { int32_t code = 0; int64_t startTs = taosGetTimestampUs(); size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); @@ -2142,7 +2138,6 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx SRetrieveTableRsp* pRsp = pDataInfo->pRsp; SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i); - SSDataBlock* pRes = pExchangeInfo->pResult; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 @@ -2155,29 +2150,37 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx continue; } - SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp; - code = - extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, - pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); - if (code != 0) { - taosMemoryFreeClear(pDataInfo->pRsp); - goto _error; + SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; + int32_t index = 0; + char* pStart = pRetrieveRsp->data; + while(index++ < pRetrieveRsp->numOfBlocks) { + SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); + blockDataEnsureCapacity(pb, pRetrieveRsp->numOfRows); + code = + extractDataBlockFromFetchRsp(pb, pLoadInfo, pRetrieveRsp->numOfRows, pStart, + pRetrieveRsp->compLen, pRetrieveRsp->numOfCols, &pDataInfo->totalRows, NULL, &pStart); + if (code != 0) { + taosMemoryFreeClear(pDataInfo->pRsp); + goto _error; + } + + taosArrayPush(pExchangeInfo->pResultBlockList, &pb); } + pLoadInfo->totalElapsed += (taosGetTimestampUs() - startTs); + if (pRsp->completed == 1) { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 - " execId:%d" - " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 - ", completed:%d try next %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRes->info.rows, - pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources); + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d" + " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", total:%.2f Kb," + " completed:%d try next %d/%" PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfRows, + pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize/1024.0, completed + 1, i + 1, totalSources); completed += 1; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64 - ", totalBytes:%" PRIu64, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows, - pLoadInfo->totalRows, pLoadInfo->totalSize); + ", total:%.2f Kb", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, + pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize/1024.0); } taosMemoryFreeClear(pDataInfo->pRsp); @@ -2191,11 +2194,12 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx } } - return pExchangeInfo->pResult; + return; } if (completed == totalSources) { - return setAllSourcesCompleted(pOperator, startTs); + setAllSourcesCompleted(pOperator, startTs); + return; } sched_yield(); @@ -2203,7 +2207,6 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx _error: pTaskInfo->code = code; - return NULL; } static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { @@ -2233,7 +2236,7 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } -static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { +static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2242,7 +2245,8 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { while (1) { if (pExchangeInfo->current >= totalSources) { - return setAllSourcesCompleted(pOperator, startTs); + setAllSourcesCompleted(pOperator, startTs); + return TSDB_CODE_SUCCESS; } doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); @@ -2255,7 +2259,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, tstrerror(pDataInfo->code)); pOperator->pTaskInfo->code = pDataInfo->code; - return NULL; + return pOperator->pTaskInfo->code; } SRetrieveTableRsp* pRsp = pDataInfo->pRsp; @@ -2272,16 +2276,16 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { continue; } - SSDataBlock* pRes = pExchangeInfo->pResult; - SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp; - int32_t code = - extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, - pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); + SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; + + char* pStart = pRetrieveRsp->data; + int32_t code = extractDataBlockFromFetchRsp(NULL, pLoadInfo, pRetrieveRsp->numOfRows, pStart, pRetrieveRsp->compLen, + pRetrieveRsp->numOfCols, &pDataInfo->totalRows, NULL, &pStart); if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources); @@ -2290,13 +2294,13 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize); } - pOperator->resultInfo.totalRows += pRes->info.rows; + pOperator->resultInfo.totalRows += pRetrieveRsp->numOfRows; taosMemoryFreeClear(pDataInfo->pRsp); - return pExchangeInfo->pResult; + return TSDB_CODE_SUCCESS; } } @@ -2320,6 +2324,11 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } +static void freeBlock(void* pParam) { + SSDataBlock* pBlock = *(SSDataBlock**)pParam; + blockDataDestroy(pBlock); +} + static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2329,9 +2338,9 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) { return NULL; } - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); - SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pOperator->status == OP_EXEC_DONE) { qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize, @@ -2339,11 +2348,23 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) { return NULL; } - if (pExchangeInfo->seqLoadData) { - return seqLoadRemoteData(pOperator); - } else { - return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); + size_t size = taosArrayGetSize(pExchangeInfo->pResultBlockList); + if (size == 0 || pExchangeInfo->rspBlockIndex >= size) { + pExchangeInfo->rspBlockIndex = 0; + taosArrayClearEx(pExchangeInfo->pResultBlockList, freeBlock); + if (pExchangeInfo->seqLoadData) { + seqLoadRemoteData(pOperator); + } else { + concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); + } + + if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) { + return NULL; + } } + + // we have buffered retrieved datablock, return it directly + return taosArrayGetP(pExchangeInfo->pResultBlockList, pExchangeInfo->rspBlockIndex++); } static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { @@ -2360,26 +2381,24 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { return NULL; } - ASSERT(pBlock == pExchangeInfo->pResult); - SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo; if (hasLimitOffsetInfo(pLimitInfo)) { - int32_t status = handleLimitOffset(pOperator, pLimitInfo, pExchangeInfo->pResult, false); + int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false); if (status == PROJECT_RETRIEVE_CONTINUE) { continue; } else if (status == PROJECT_RETRIEVE_DONE) { - size_t rows = pExchangeInfo->pResult->info.rows; + size_t rows = pBlock->info.rows; pExchangeInfo->limitInfo.numOfOutputRows += rows; if (rows == 0) { doSetOperatorCompleted(pOperator); return NULL; } else { - return pExchangeInfo->pResult; + return pBlock; } } } else { - return pExchangeInfo->pResult; + return pBlock; } } } @@ -2442,16 +2461,18 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode } tsem_init(&pInfo->ready, 0, 0); + pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc); + pInfo->pResultBlockList = taosArrayInit(1, POINTER_BYTES); pInfo->seqLoadData = false; pInfo->pTransporter = pTransporter; - pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc); + pOperator->name = "ExchangeOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pResult->pDataBlock); + pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock); pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, @@ -3585,12 +3606,15 @@ void doDestroyExchangeOperatorInfo(void* param) { taosArrayDestroy(pExInfo->pSources); taosArrayDestroy(pExInfo->pSourceDataInfo); - if (pExInfo->pResult != NULL) { - pExInfo->pResult = blockDataDestroy(pExInfo->pResult); + + if (pExInfo->pResultBlockList != NULL) { + taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock); + pExInfo->pResultBlockList = NULL; } - tsem_destroy(&pExInfo->ready); + blockDataDestroy(pExInfo->pDummyBlock); + tsem_destroy(&pExInfo->ready); taosMemoryFreeClear(param); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8545abecfb..03c08a6bf5 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2266,9 +2266,11 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { } } + char* pStart = pRsp->data; extractDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen, - pOperator->exprSupp.numOfExprs, startTs, NULL, pInfo->scanCols); + pOperator->exprSupp.numOfExprs, NULL, pInfo->scanCols, &pStart); + //startTs, // todo log the filter info doFilterResult(pInfo); taosMemoryFree(pRsp); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 0bf6d4c921..c96854b198 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -27,10 +27,9 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen)); for (int32_t i = 0; i < blockNum; i++) { - /*int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);*/ SRetrieveTableRsp* pRetrieve = taosArrayGetP(pReq->data, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i); - blockDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data); + blockDecode(pDataBlock, pRetrieve->data); // TODO: refactor pDataBlock->info.window.skey = be64toh(pRetrieve->skey); pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); @@ -51,7 +50,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock taosArraySetSize(pArray, 1); SRetrieveTableRsp* pRetrieve = pReq->pRetrieve; SSDataBlock* pDataBlock = taosArrayGet(pArray, 0); - blockDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data); + blockDecode(pDataBlock, pRetrieve->data); // TODO: refactor pDataBlock->info.window.skey = be64toh(pRetrieve->skey); pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);