fix(query): assign the group id in the ssdatablock when generating grouped results.
This commit is contained in:
parent
7b55cb0df2
commit
5e819fbdb3
|
@ -2062,15 +2062,7 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
|
||||||
pAggInfo->groupId = groupId;
|
pAggInfo->groupId = groupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
||||||
* For interval query of both super table and table, copy the data in ascending order, since the output results are
|
|
||||||
* ordered in SWindowResutl already. While handling the group by query for both table and super table,
|
|
||||||
* all group result are completed already.
|
|
||||||
*
|
|
||||||
* @param pQInfo
|
|
||||||
* @param result
|
|
||||||
*/
|
|
||||||
int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
|
||||||
int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) {
|
int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) {
|
||||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||||
int32_t start = pGroupResInfo->index;
|
int32_t start = pGroupResInfo->index;
|
||||||
|
@ -2087,6 +2079,15 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pBlock->info.groupId == 0) {
|
||||||
|
pBlock->info.groupId = pPos->groupId;
|
||||||
|
} else {
|
||||||
|
// current value belongs to different group, it can't be packed into one datablock
|
||||||
|
if (pBlock->info.groupId != pPos->groupId) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2100,9 +2101,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
|
||||||
if (pCtx[j].fpSet.finalize) {
|
if (pCtx[j].fpSet.finalize) {
|
||||||
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||||
if (TAOS_FAILED(code)) {
|
if (TAOS_FAILED(code)) {
|
||||||
qError("%s build result data block error, code %s", GET_TASKID(taskInfo), tstrerror(code));
|
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||||
taskInfo->code = code;
|
longjmp(pTaskInfo->env, code);
|
||||||
longjmp(taskInfo->env, code);
|
|
||||||
}
|
}
|
||||||
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
||||||
// do nothing, todo refactor
|
// do nothing, todo refactor
|
||||||
|
@ -2124,7 +2124,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// qDebug("QInfo:0x%"PRIx64" copy data to query buf completed", GET_TASKID(pRuntimeEnv));
|
qDebug("%s result generated, rows:%d, groupId:%"PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, pBlock->info.groupId);
|
||||||
blockDataUpdateTsWindow(pBlock);
|
blockDataUpdateTsWindow(pBlock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -2145,10 +2145,9 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// clear the existed group id
|
||||||
|
pBlock->info.groupId = 0;
|
||||||
doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, rowCellOffset, pCtx, numOfExprs);
|
doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, rowCellOffset, pCtx, numOfExprs);
|
||||||
|
|
||||||
// add condition (pBlock->info.rows >= 1) just to runtime happy
|
|
||||||
blockDataUpdateTsWindow(pBlock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo,
|
static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo,
|
||||||
|
@ -3656,7 +3655,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
|
||||||
doSetOperatorCompleted(pOperator);
|
|
||||||
return (blockDataGetNumOfRows(pInfo->pRes) != 0) ? pInfo->pRes : NULL;
|
return (blockDataGetNumOfRows(pInfo->pRes) != 0) ? pInfo->pRes : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue