[td-255] code refactor.
This commit is contained in:
parent
114635ad19
commit
e119a53307
|
@ -6388,6 +6388,19 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, SQueryRuntimeEnv* pRuntimeEnv, bool* newgroup) {
|
||||||
|
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||||
|
int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
|
||||||
|
taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
|
||||||
|
|
||||||
|
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||||
|
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
||||||
|
|
||||||
|
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
|
||||||
|
pInfo->existNewGroupBlock = NULL;
|
||||||
|
*newgroup = true;
|
||||||
|
}
|
||||||
|
|
||||||
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRuntimeEnv *pRuntimeEnv, bool *newgroup) {
|
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRuntimeEnv *pRuntimeEnv, bool *newgroup) {
|
||||||
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||||
*newgroup = false;
|
*newgroup = false;
|
||||||
|
@ -6399,16 +6412,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRunt
|
||||||
|
|
||||||
// handle the cached new group data block
|
// handle the cached new group data block
|
||||||
if (pInfo->existNewGroupBlock) {
|
if (pInfo->existNewGroupBlock) {
|
||||||
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup);
|
||||||
int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
|
|
||||||
taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
|
|
||||||
|
|
||||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
|
||||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
|
||||||
|
|
||||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
|
|
||||||
pInfo->existNewGroupBlock = NULL;
|
|
||||||
*newgroup = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6427,26 +6431,6 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
|
||||||
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
|
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
|
||||||
return pInfo->pRes;
|
return pInfo->pRes;
|
||||||
}
|
}
|
||||||
// if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
|
||||||
// *newgroup = false;
|
|
||||||
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity);
|
|
||||||
// return pInfo->pRes;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // handle the cached new group data block
|
|
||||||
// if (pInfo->existNewGroupBlock) {
|
|
||||||
// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
|
||||||
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
|
|
||||||
// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
|
|
||||||
//
|
|
||||||
// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
|
||||||
// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
|
||||||
//
|
|
||||||
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
|
|
||||||
// pInfo->existNewGroupBlock = NULL;
|
|
||||||
// *newgroup = true;
|
|
||||||
// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
|
||||||
// }
|
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||||
|
@ -6493,46 +6477,13 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
|
||||||
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) {
|
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) {
|
||||||
return pInfo->pRes;
|
return pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
|
||||||
// *newgroup = false;
|
|
||||||
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity);
|
|
||||||
// return pInfo->pRes;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // handle the cached new group data block
|
|
||||||
// if (pInfo->existNewGroupBlock) {
|
|
||||||
// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
|
||||||
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
|
|
||||||
// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
|
|
||||||
//
|
|
||||||
// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
|
||||||
// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
|
||||||
//
|
|
||||||
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
|
|
||||||
// pInfo->existNewGroupBlock = NULL;
|
|
||||||
// *newgroup = true;
|
|
||||||
//
|
|
||||||
// if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) {
|
|
||||||
// return pInfo->pRes;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
//// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
|
||||||
// }
|
|
||||||
|
|
||||||
} else if (pInfo->existNewGroupBlock) { // try next group
|
} else if (pInfo->existNewGroupBlock) { // try next group
|
||||||
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
assert(pBlock != NULL);
|
||||||
int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey;
|
doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup);
|
||||||
taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
|
|
||||||
|
|
||||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) {
|
||||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
return pInfo->pRes;
|
||||||
|
}
|
||||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
|
|
||||||
pInfo->existNewGroupBlock = NULL;
|
|
||||||
*newgroup = true;
|
|
||||||
|
|
||||||
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
|
|
||||||
} else {
|
} else {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue