[td-11818] refactor.
This commit is contained in:
parent
1ae36227e4
commit
896b269fab
|
@ -3990,7 +3990,7 @@ static int32_t doCopyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, SGroupResInfo* p
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void toSSDataBlock(SGroupResInfo *pGroupResInfo, STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock) {
|
static void toSDatablock(SGroupResInfo *pGroupResInfo, STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock) {
|
||||||
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
|
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
|
||||||
|
|
||||||
pBlock->info.rows = 0;
|
pBlock->info.rows = 0;
|
||||||
|
@ -6167,7 +6167,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
|
||||||
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
|
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
|
||||||
|
|
||||||
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
@ -6221,7 +6221,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
|
||||||
|
|
||||||
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo);
|
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo);
|
||||||
|
|
||||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
|
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
|
||||||
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
@ -6411,7 +6411,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
|
||||||
|
|
||||||
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
||||||
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
@ -6451,7 +6451,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
|
||||||
finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
|
finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
|
||||||
|
|
||||||
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo);
|
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo);
|
||||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
||||||
|
|
||||||
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
|
@ -6470,7 +6470,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
|
||||||
|
|
||||||
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
||||||
|
|
||||||
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
|
@ -6511,7 +6511,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
|
||||||
finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
|
finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
|
||||||
|
|
||||||
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo);
|
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo);
|
||||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
||||||
|
|
||||||
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
@ -6725,7 +6725,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
|
||||||
|
|
||||||
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
|
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
|
||||||
|
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
@ -6763,7 +6763,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
|
||||||
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
|
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
|
||||||
|
|
||||||
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
|
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
|
||||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
|
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
|
||||||
|
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
@ -6784,7 +6784,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
|
||||||
|
|
||||||
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
|
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
|
||||||
|
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
@ -6823,7 +6823,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
|
||||||
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
|
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
|
||||||
|
|
||||||
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
|
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
|
||||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
|
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
|
||||||
|
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
@ -6842,7 +6842,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
|
||||||
|
|
||||||
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
|
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
|
||||||
|
|
||||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
@ -6886,7 +6886,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
|
||||||
sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
|
sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
|
toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
|
||||||
|
|
||||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
|
Loading…
Reference in New Issue