fix(query): set correct stddev merge input rows.

This commit is contained in:
Haojun Liao 2022-07-10 11:02:54 +08:00
parent 9738bb4aff
commit 3ee9d160d9
4 changed files with 7 additions and 21 deletions

View File

@ -911,7 +911,6 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);

View File

@ -4546,18 +4546,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOptr;
}
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param) {
const SQueryTableDataCond* pCond = param;
const STimeWindow* pWin1 = p1;
const STimeWindow* pWin2 = p2;
if (pCond->order == TSDB_ORDER_ASC) {
return pWin1->skey - pWin2->skey;
} else if (pCond->order == TSDB_ORDER_DESC) {
return pWin2->skey - pWin1->skey;
}
return 0;
}
SArray* extractColumnInfo(SNodeList* pNodeList) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));

View File

@ -1156,11 +1156,13 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBlock);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
if (pBlock->info.rows > 0) {
break;
}

View File

@ -1863,8 +1863,6 @@ static void stddevTransferInfo(SStddevRes* pInput, SStddevRes* pOutput) {
}
pOutput->count += pInput->count;
return;
}
int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) {
@ -1874,14 +1872,13 @@ int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) {
SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t start = pInput->startRowIndex;
char* data = colDataGetData(pCol, start);
SStddevRes* pInputInfo = (SStddevRes*)varDataVal(data);
stddevTransferInfo(pInputInfo, pInfo);
for(int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i);
SStddevRes* pInputInfo = (SStddevRes*)varDataVal(data);
stddevTransferInfo(pInputInfo, pInfo);
}
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
return TSDB_CODE_SUCCESS;
}