enh(query): support combine multi datablock to pass to upstream operators

This commit is contained in:
Haojun Liao 2022-08-05 14:10:45 +08:00
parent ead033e9a1
commit 0e4748771a
9 changed files with 142 additions and 80 deletions

View File

@ -239,7 +239,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId
SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index);
void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress);
const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);
const char* blockDecode(SSDataBlock* pBlock, const char* pData);
void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag);
void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag);

View File

@ -1765,6 +1765,17 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
int32_t dataLen = *(int32_t*)p;
p += sizeof(int32_t);
int32_t rows = *(int32_t*)p;
p += sizeof(int32_t);
int32_t cols = *(int32_t*)p;
p += sizeof(int32_t);
ASSERT(rows == numOfRows && cols == numOfCols);
int32_t hasColumnSeg = *(int32_t*)p;
p += sizeof(int32_t);
uint64_t groupId = *(uint64_t*)p;
p += sizeof(uint64_t);

View File

@ -676,9 +676,9 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
* @return
*/
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
// | total rows/total length | block group id | column schema | each column length |
return sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)) +
numOfCols * sizeof(int32_t);
// | total length | total rows | total columns | has column seg| block group id | column schema | each column length |
return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(uint64_t) +
numOfCols * (sizeof(int16_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t);
}
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
@ -2077,6 +2077,18 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_
int32_t* actualLen = (int32_t*)data;
data += sizeof(int32_t);
int32_t* rows = (int32_t*)data;
*rows = pBlock->info.rows;
data += sizeof(int32_t);
int32_t* cols = (int32_t*)data;
*cols = numOfCols;
data += sizeof(int32_t);
int32_t* hasColumnSegment = (int32_t*)data;
*hasColumnSegment = 1;
data += sizeof(int32_t);
uint64_t* groupId = (uint64_t*)data;
data += sizeof(uint64_t);
@ -2130,12 +2142,26 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_
*groupId = pBlock->info.groupId;
}
const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) {
const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
const char* pStart = pData;
// total length sizeof(int32_t)
int32_t dataLen = *(int32_t*)pStart;
pStart += sizeof(int32_t);
// total rows sizeof(int32_t)
int32_t numOfRows = *(int32_t*)pStart;
pStart += sizeof(int32_t);
// total columns sizeof(int32_t)
int32_t numOfCols = *(int32_t*)pStart;
pStart += sizeof(int32_t);
// has column info segment
int32_t hasColumnInfo = *(int32_t*)pStart;
pStart += sizeof(int32_t);
// group id sizeof(uint64_t)
pBlock->info.groupId = *(uint64_t*)pStart;
pStart += sizeof(uint64_t);

View File

@ -2714,7 +2714,6 @@ void tsdbReaderClose(STsdbReader* pReader) {
}
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap);
taosMemoryFreeClear(pSupInfo->plist);
@ -2742,10 +2741,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
SIOCostSummary* pCost = &pReader->cost;
tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
" SMA-time:%.2f ms, "
"fileBlocks:%" PRId64
", fileBlocks-time:%.2f ms, build in-memory-block-time:%.2f ms, STableBlockScanInfo "
"size:%.2f Kb %s",
" SMA-time:%.2f ms, fileBlocks:%" PRId64 ", fileBlocks-time:%.2f ms, "
"build in-memory-block-time:%.2f ms, STableBlockScanInfo size:%.2f Kb %s",
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime,
pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock,
numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);

View File

@ -266,7 +266,11 @@ typedef struct SExchangeInfo {
SArray* pSourceDataInfo;
tsem_t ready;
void* pTransporter;
SSDataBlock* pResult;
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator
SArray* pResultBlockList;
int32_t rspBlockIndex; // indicate the return block index in pResultBlockList
SSDataBlock* pDummyBlock; // dummy block, not keep data
bool seqLoadData; // sequential load data or not, false by default
int32_t current;
SLoadRemoteDataInfo loadInfo;
@ -855,8 +859,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWin
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList);
int32_t compLen, int32_t numOfOutput, uint64_t* total, SArray* pColList, char** pNextStart);
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);

View File

@ -69,10 +69,10 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
// clang-format off
// data format:
// +----------------+--------------+-----------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | total length | group id | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data | .... | | (4 bytes) |(8 bytes)
// | |sizeof(int32) |sizeof(uint64_t) |(sizeof(int16_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | |
// +----------------+--------------+-----------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
// +----------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | total length | numOfRows | group id | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data | .... | | (4 bytes) |(8 bytes)
// | |sizeof(int32) | sizeof(int32)| sizeof(uint64_t) |(sizeof(int16_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | |
// +----------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header
// clang-format on

View File

@ -1956,6 +1956,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
pRsp->compLen = htonl(pRsp->compLen);
pRsp->numOfCols = htonl(pRsp->numOfCols);
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
ASSERT(pRsp != NULL);
qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows);
@ -2043,12 +2044,10 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
}
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList) {
int32_t compLen, int32_t numOfOutput, uint64_t* total, SArray* pColList, char** pNextStart) {
if (pColList == NULL) { // data from other sources
blockDataCleanup(pRes);
// blockDataEnsureCapacity(pRes, numOfRows);
blockDecode(pRes, numOfOutput, numOfRows, pData);
*pNextStart = (char*) blockDecode(pRes, pData);
} else { // extract data according to pColList
ASSERT(numOfOutput == taosArrayGetSize(pColList));
char* pStart = pData;
@ -2072,7 +2071,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo
blockDataAppendColInfo(pBlock, &idata);
}
blockDecode(pBlock, numOfCols, numOfRows, pStart);
blockDecode(pBlock, pStart);
blockDataEnsureCapacity(pRes, numOfRows);
// data from mnode
@ -2084,8 +2083,6 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo
// todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator.
blockDataUpdateTsWindow(pRes, 0);
int64_t el = taosGetTimestampUs() - startTs;
pLoadInfo->totalRows += numOfRows;
pLoadInfo->totalSize += compLen;
@ -2093,7 +2090,6 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo
*total += numOfRows;
}
pLoadInfo->totalElapsed += el;
return TSDB_CODE_SUCCESS;
}
@ -2115,8 +2111,8 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
return NULL;
}
static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
SExecTaskInfo* pTaskInfo) {
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
SExecTaskInfo* pTaskInfo) {
int32_t code = 0;
int64_t startTs = taosGetTimestampUs();
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
@ -2142,7 +2138,6 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
SSDataBlock* pRes = pExchangeInfo->pResult;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
@ -2155,29 +2150,37 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
continue;
}
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
code =
extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
if (code != 0) {
taosMemoryFreeClear(pDataInfo->pRsp);
goto _error;
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
int32_t index = 0;
char* pStart = pRetrieveRsp->data;
while(index++ < pRetrieveRsp->numOfBlocks) {
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
blockDataEnsureCapacity(pb, pRetrieveRsp->numOfRows);
code =
extractDataBlockFromFetchRsp(pb, pLoadInfo, pRetrieveRsp->numOfRows, pStart,
pRetrieveRsp->compLen, pRetrieveRsp->numOfCols, &pDataInfo->totalRows, NULL, &pStart);
if (code != 0) {
taosMemoryFreeClear(pDataInfo->pRsp);
goto _error;
}
taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
}
pLoadInfo->totalElapsed += (taosGetTimestampUs() - startTs);
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d"
" index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64
", completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRes->info.rows,
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources);
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d"
" index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", total:%.2f Kb,"
" completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfRows,
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize/1024.0, completed + 1, i + 1, totalSources);
completed += 1;
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
pLoadInfo->totalRows, pLoadInfo->totalSize);
", total:%.2f Kb", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId,
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize/1024.0);
}
taosMemoryFreeClear(pDataInfo->pRsp);
@ -2191,11 +2194,12 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
}
}
return pExchangeInfo->pResult;
return;
}
if (completed == totalSources) {
return setAllSourcesCompleted(pOperator, startTs);
setAllSourcesCompleted(pOperator, startTs);
return;
}
sched_yield();
@ -2203,7 +2207,6 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
_error:
pTaskInfo->code = code;
return NULL;
}
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
@ -2233,7 +2236,7 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -2242,7 +2245,8 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
while (1) {
if (pExchangeInfo->current >= totalSources) {
return setAllSourcesCompleted(pOperator, startTs);
setAllSourcesCompleted(pOperator, startTs);
return TSDB_CODE_SUCCESS;
}
doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
@ -2255,7 +2259,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo),
pSource->addr.nodeId, pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
pOperator->pTaskInfo->code = pDataInfo->code;
return NULL;
return pOperator->pTaskInfo->code;
}
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
@ -2272,16 +2276,16 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
continue;
}
SSDataBlock* pRes = pExchangeInfo->pResult;
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
int32_t code =
extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
char* pStart = pRetrieveRsp->data;
int32_t code = extractDataBlockFromFetchRsp(NULL, pLoadInfo, pRetrieveRsp->numOfRows, pStart, pRetrieveRsp->compLen,
pRetrieveRsp->numOfCols, &pDataInfo->totalRows, NULL, &pStart);
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
totalSources);
@ -2290,13 +2294,13 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
pLoadInfo->totalRows, pLoadInfo->totalSize);
}
pOperator->resultInfo.totalRows += pRes->info.rows;
pOperator->resultInfo.totalRows += pRetrieveRsp->numOfRows;
taosMemoryFreeClear(pDataInfo->pRsp);
return pExchangeInfo->pResult;
return TSDB_CODE_SUCCESS;
}
}
@ -2320,6 +2324,11 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}
static void freeBlock(void* pParam) {
SSDataBlock* pBlock = *(SSDataBlock**)pParam;
blockDataDestroy(pBlock);
}
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -2329,9 +2338,9 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
return NULL;
}
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pOperator->status == OP_EXEC_DONE) {
qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
@ -2339,11 +2348,23 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
return NULL;
}
if (pExchangeInfo->seqLoadData) {
return seqLoadRemoteData(pOperator);
} else {
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
size_t size = taosArrayGetSize(pExchangeInfo->pResultBlockList);
if (size == 0 || pExchangeInfo->rspBlockIndex >= size) {
pExchangeInfo->rspBlockIndex = 0;
taosArrayClearEx(pExchangeInfo->pResultBlockList, freeBlock);
if (pExchangeInfo->seqLoadData) {
seqLoadRemoteData(pOperator);
} else {
concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
}
if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
return NULL;
}
}
// we have buffered retrieved datablock, return it directly
return taosArrayGetP(pExchangeInfo->pResultBlockList, pExchangeInfo->rspBlockIndex++);
}
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
@ -2360,26 +2381,24 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
return NULL;
}
ASSERT(pBlock == pExchangeInfo->pResult);
SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
if (hasLimitOffsetInfo(pLimitInfo)) {
int32_t status = handleLimitOffset(pOperator, pLimitInfo, pExchangeInfo->pResult, false);
int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
if (status == PROJECT_RETRIEVE_CONTINUE) {
continue;
} else if (status == PROJECT_RETRIEVE_DONE) {
size_t rows = pExchangeInfo->pResult->info.rows;
size_t rows = pBlock->info.rows;
pExchangeInfo->limitInfo.numOfOutputRows += rows;
if (rows == 0) {
doSetOperatorCompleted(pOperator);
return NULL;
} else {
return pExchangeInfo->pResult;
return pBlock;
}
}
} else {
return pExchangeInfo->pResult;
return pBlock;
}
}
}
@ -2442,16 +2461,18 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
}
tsem_init(&pInfo->ready, 0, 0);
pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
pInfo->pResultBlockList = taosArrayInit(1, POINTER_BYTES);
pInfo->seqLoadData = false;
pInfo->pTransporter = pTransporter;
pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
pOperator->name = "ExchangeOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pResult->pDataBlock);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
@ -3585,12 +3606,15 @@ void doDestroyExchangeOperatorInfo(void* param) {
taosArrayDestroy(pExInfo->pSources);
taosArrayDestroy(pExInfo->pSourceDataInfo);
if (pExInfo->pResult != NULL) {
pExInfo->pResult = blockDataDestroy(pExInfo->pResult);
if (pExInfo->pResultBlockList != NULL) {
taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
pExInfo->pResultBlockList = NULL;
}
tsem_destroy(&pExInfo->ready);
blockDataDestroy(pExInfo->pDummyBlock);
tsem_destroy(&pExInfo->ready);
taosMemoryFreeClear(param);
}

View File

@ -2266,9 +2266,11 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
}
}
char* pStart = pRsp->data;
extractDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen,
pOperator->exprSupp.numOfExprs, startTs, NULL, pInfo->scanCols);
pOperator->exprSupp.numOfExprs, NULL, pInfo->scanCols, &pStart);
//startTs,
// todo log the filter info
doFilterResult(pInfo);
taosMemoryFree(pRsp);

View File

@ -27,10 +27,9 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen));
for (int32_t i = 0; i < blockNum; i++) {
/*int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);*/
SRetrieveTableRsp* pRetrieve = taosArrayGetP(pReq->data, i);
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
blockDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
blockDecode(pDataBlock, pRetrieve->data);
// TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
@ -51,7 +50,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
taosArraySetSize(pArray, 1);
SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
SSDataBlock* pDataBlock = taosArrayGet(pArray, 0);
blockDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
blockDecode(pDataBlock, pRetrieve->data);
// TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);