Merge pull request #14904 from taosdata/feature/3_liaohj
fix(query): handle the grouped fill.
This commit is contained in:
commit
34c973387d
|
@ -560,6 +560,7 @@ typedef struct SFillOperatorInfo {
|
|||
SNode* pCondition;
|
||||
SArray* pColMatchColInfo;
|
||||
int32_t primaryTsCol;
|
||||
uint64_t curGroupId; // current handled group id
|
||||
} SFillOperatorInfo;
|
||||
|
||||
typedef struct SGroupbyOperatorInfo {
|
||||
|
|
|
@ -1637,8 +1637,6 @@ static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows,
|
|||
|
||||
int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) {
|
||||
int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows);
|
||||
pBlock->info.rows += numOfRows;
|
||||
|
||||
return pBlock->info.rows;
|
||||
}
|
||||
|
||||
|
@ -3344,14 +3342,15 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult
|
|||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
||||
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity);
|
||||
pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId;
|
||||
pInfo->existNewGroupBlock = NULL;
|
||||
*newgroup = true;
|
||||
// *newgroup = true;
|
||||
}
|
||||
|
||||
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||
*newgroup = false;
|
||||
// *newgroup = false;
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity);
|
||||
if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) {
|
||||
return;
|
||||
|
@ -3373,10 +3372,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
|
||||
blockDataCleanup(pResBlock);
|
||||
|
||||
// todo handle different group data interpolation
|
||||
bool n = false;
|
||||
bool* newgroup = &n;
|
||||
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
|
||||
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, NULL, pTaskInfo);
|
||||
if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) {
|
||||
return pResBlock;
|
||||
}
|
||||
|
@ -3384,31 +3380,29 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
|
||||
if (*newgroup) {
|
||||
assert(pBlock != NULL);
|
||||
}
|
||||
if (pBlock == NULL) {
|
||||
if (pInfo->totalInputRows == 0) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol);
|
||||
|
||||
if (*newgroup && pInfo->totalInputRows > 0) { // there are already processed current group data block
|
||||
pInfo->existNewGroupBlock = pBlock;
|
||||
*newgroup = false;
|
||||
|
||||
// Fill the previous group data block, before handle the data block of new group.
|
||||
// Close the fill operation for previous group data block
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
|
||||
} else {
|
||||
if (pBlock == NULL) {
|
||||
if (pInfo->totalInputRows == 0) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return NULL;
|
||||
}
|
||||
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol);
|
||||
|
||||
if (pInfo->curGroupId == 0 || pInfo->curGroupId == pBlock->info.groupId) {
|
||||
pInfo->curGroupId = pBlock->info.groupId; // the first data block
|
||||
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
|
||||
} else {
|
||||
pInfo->totalInputRows += pBlock->info.rows;
|
||||
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
|
||||
} else if (pInfo->curGroupId != pBlock->info.groupId) { // the new group data block
|
||||
pInfo->existNewGroupBlock = pBlock;
|
||||
|
||||
// Fill the previous group data block, before handle the data block of new group.
|
||||
// Close the fill operation for previous group data block
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3419,17 +3413,17 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
if (pResBlock->info.rows > 0) {
|
||||
// 1. The result in current group not reach the threshold of output result, continue
|
||||
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
|
||||
if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) {
|
||||
if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
|
||||
return pResBlock;
|
||||
}
|
||||
|
||||
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
|
||||
if (pResBlock->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) {
|
||||
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, NULL, pTaskInfo);
|
||||
if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
|
||||
return pResBlock;
|
||||
}
|
||||
} else if (pInfo->existNewGroupBlock) { // try next group
|
||||
assert(pBlock != NULL);
|
||||
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
|
||||
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, NULL, pTaskInfo);
|
||||
if (pResBlock->info.rows > pResultInfo->threshold) {
|
||||
return pResBlock;
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
|
|||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
|
||||
|
||||
// set the primary timestamp column value
|
||||
int32_t index = pFillInfo->numOfCurrent;
|
||||
int32_t index = pBlock->info.rows;
|
||||
|
||||
// set the other values
|
||||
if (pFillInfo->type == TSDB_FILL_PREV) {
|
||||
|
@ -191,6 +191,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
|
|||
SInterval* pInterval = &pFillInfo->interval;
|
||||
pFillInfo->currentKey =
|
||||
taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
|
||||
pBlock->info.rows += 1;
|
||||
pFillInfo->numOfCurrent++;
|
||||
}
|
||||
|
||||
|
@ -273,6 +274,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
|
|||
}
|
||||
} else {
|
||||
assert(pFillInfo->currentKey == ts);
|
||||
int32_t index = pBlock->info.rows;
|
||||
|
||||
if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) {
|
||||
int32_t nextRowIndex = pFillInfo->index + 1;
|
||||
|
@ -296,24 +298,24 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
|
|||
if (i == 0 || (/*pCol->functionId != FUNCTION_COUNT &&*/ !colDataIsNull_s(pSrc, pFillInfo->index)) /*||
|
||||
(pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)*/) {
|
||||
bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
|
||||
colDataAppend(pDst, pFillInfo->numOfCurrent, src, isNull);
|
||||
colDataAppend(pDst, index, src, isNull);
|
||||
saveColData(pFillInfo->prev, i, src, isNull);
|
||||
} else { // i > 0 and data is null , do interpolation
|
||||
if (pFillInfo->type == TSDB_FILL_PREV) {
|
||||
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
|
||||
doSetVal(pDst, pFillInfo->numOfCurrent, pKey);
|
||||
doSetVal(pDst, index, pKey);
|
||||
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
|
||||
bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
|
||||
colDataAppend(pDst, pFillInfo->numOfCurrent, src, isNull);
|
||||
colDataAppend(pDst, index, src, isNull);
|
||||
saveColData(pFillInfo->prev, i, src, isNull);
|
||||
} else if (pFillInfo->type == TSDB_FILL_NULL) {
|
||||
colDataAppendNULL(pDst, pFillInfo->numOfCurrent);
|
||||
colDataAppendNULL(pDst, index);
|
||||
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
|
||||
SGroupKeys* pKey = taosArrayGet(pFillInfo->next, i);
|
||||
doSetVal(pDst, pFillInfo->numOfCurrent, pKey);
|
||||
doSetVal(pDst, index, pKey);
|
||||
} else {
|
||||
SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
|
||||
colDataAppend(pDst, pFillInfo->numOfCurrent, (char*)&pVar->i, false);
|
||||
colDataAppend(pDst, index, (char*)&pVar->i, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -324,6 +326,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
|
|||
pFillInfo->currentKey =
|
||||
taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
|
||||
|
||||
pBlock->info.rows += 1;
|
||||
pFillInfo->index += 1;
|
||||
pFillInfo->numOfCurrent += 1;
|
||||
}
|
||||
|
|
|
@ -6026,8 +6026,6 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
pInfo->ts = cts;
|
||||
pResInfo->numOfRes = 1;
|
||||
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
|
||||
if (!pInfo->hasResult) {
|
||||
|
|
Loading…
Reference in New Issue