diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 27d99bd42d..7c39e378cb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1621,7 +1621,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* int64_t st = taosGetTimestampUs(); int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader); - blockDataUpdateTsWindow(pBlock, 0); + blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotIds[0]); pBlock->info.id.uid = pBlockScanInfo->uid; setComposedBlockFlag(pReader, true); @@ -2493,7 +2493,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { _end: pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0; - blockDataUpdateTsWindow(pResBlock, 0); + blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotIds[0]); setComposedBlockFlag(pReader, true); double el = (taosGetTimestampUs() - st) / 1000.0; @@ -3534,7 +3534,6 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, STableBlockScanInfo* pScanInfo) { int32_t outputRowIndex = pBlock->info.rows; - int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); int64_t uid = pScanInfo->uid; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; @@ -3549,7 +3548,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* i += 1; } - while (i < numOfCols && j < pSchema->numOfCols) { + while (i < pSupInfo->numOfCols && j < pSchema->numOfCols) { col_id_t colId = pSupInfo->colIds[i]; if (colId == pSchema->columns[j].colId) { @@ -3570,7 +3569,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* } // set null value since current column does not exist in the "pSchema" - while (i < numOfCols) { + while (i < pSupInfo->numOfCols) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotIds[i]); colDataAppendNULL(pColInfoData, outputRowIndex); i += 1; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1729c9f1e8..9fc3575491 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2513,10 +2513,8 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t readIdx = source->readerIdx; SSDataBlock* pBlock = source->inputBlock; - STableMergeScanInfo* pTableScanInfo = pOperator->info; - SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx); - blockDataCleanup(pBlock); + SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx); int64_t st = taosGetTimestampUs(); void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); @@ -2534,12 +2532,11 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { } // process this data block based on the probabilities - bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); + bool processThisBlock = processBlockWithProbability(&pInfo->sample); if (!processThisBlock) { continue; } - blockDataCleanup(pBlock); if (pQueryCond->order == TSDB_ORDER_ASC) { pQueryCond->twindows.skey = pBlock->info.window.ekey + 1; } else { @@ -2547,7 +2544,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { } uint32_t status = 0; - loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); + loadDataBlock(pOperator, &pInfo->base, pBlock, &status); // code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); @@ -2561,7 +2558,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); pOperator->resultInfo.totalRows += pBlock->info.rows; - pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; tsdbReaderClose(pInfo->base.dataReader); pInfo->base.dataReader = NULL; @@ -2641,6 +2638,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { param.readerIdx = i; param.pOperator = pOperator; param.inputBlock = createOneDataBlock(pInfo->pResBlock, false); + blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity); + taosArrayPush(pInfo->sortSourceParams, ¶m); SQueryTableDataCond cond; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 5f077c98d7..52f06ffab4 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1659,22 +1659,11 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt // the primary timestamp column bool needed = false; - for (int32_t i = 0; i < numOfCols; ++i) { + for(int32_t i = 0; i < numOfCols; ++i) { SExprInfo* pExpr = pCtx[i].pExpr; - if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) { - SFunctParam* pParam = &pExpr->base.pParam[0]; - - SColumn c = *pParam->pCol; - taosArrayPush(pInfo->pInterpCols, &c); needed = true; - - SGroupKeys key = {0}; - key.bytes = c.bytes; - key.type = c.type; - key.isNull = false; - key.pData = taosMemoryCalloc(1, c.bytes); - taosArrayPush(pInfo->pPrevValues, &key); + break; } } @@ -1699,6 +1688,24 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt } } + for (int32_t i = 0; i < numOfCols; ++i) { + SExprInfo* pExpr = pCtx[i].pExpr; + + if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) { + SFunctParam* pParam = &pExpr->base.pParam[0]; + + SColumn c = *pParam->pCol; + taosArrayPush(pInfo->pInterpCols, &c); + + SGroupKeys key = {0}; + key.bytes = c.bytes; + key.type = c.type; + key.isNull = false; + key.pData = taosMemoryCalloc(1, c.bytes); + taosArrayPush(pInfo->pPrevValues, &key); + } + } + return needed; }