Merge pull request #25341 from taosdata/fix/3_liaohj
fix(query): check the rows before apply the agg in session window.
This commit is contained in:
commit
5372988bb4
|
@ -708,9 +708,7 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i
|
||||||
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size);
|
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size);
|
||||||
|
|
||||||
pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo);
|
pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo);
|
||||||
|
|
||||||
pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1);
|
pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1);
|
||||||
|
|
||||||
pInfo->pResBlock->info.blankFill = false;
|
pInfo->pResBlock->info.blankFill = false;
|
||||||
|
|
||||||
if (!pInfo->needCountEmptyTable) {
|
if (!pInfo->needCountEmptyTable) {
|
||||||
|
@ -1011,8 +1009,8 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
||||||
STableScanInfo* pInfo = pOperator->info;
|
STableScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
STableKeyInfo* pList = NULL;
|
STableKeyInfo* pList = NULL;
|
||||||
|
|
||||||
if (pInfo->currentGroupId == -1) {
|
if (pInfo->currentGroupId == -1) {
|
||||||
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
|
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
|
||||||
|
@ -1020,7 +1018,10 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosRLockLatch(&pTaskInfo->lock);
|
||||||
initNextGroupScan(pInfo, &pList, &num);
|
initNextGroupScan(pInfo, &pList, &num);
|
||||||
|
taosRUnLockLatch(&pTaskInfo->lock);
|
||||||
|
|
||||||
ASSERT(pInfo->base.dataReader == NULL);
|
ASSERT(pInfo->base.dataReader == NULL);
|
||||||
|
|
||||||
int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
|
int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
|
||||||
|
|
|
@ -1332,23 +1332,25 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
|
||||||
// The gap is less than the threshold, so it belongs to current session window that has been opened already.
|
// The gap is less than the threshold, so it belongs to current session window that has been opened already.
|
||||||
doKeepTuple(pRowSup, tsList[j], gid);
|
doKeepTuple(pRowSup, tsList[j], gid);
|
||||||
} else { // start a new session window
|
} else { // start a new session window
|
||||||
SResultRow* pResult = NULL;
|
// start a new session window
|
||||||
|
if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window
|
||||||
|
SResultRow* pResult = NULL;
|
||||||
|
|
||||||
// keep the time window for the closed time window.
|
// keep the time window for the closed time window.
|
||||||
STimeWindow window = pRowSup->win;
|
STimeWindow window = pRowSup->win;
|
||||||
|
int32_t ret =
|
||||||
|
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
|
||||||
|
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||||
|
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
pRowSup->win.ekey = pRowSup->win.skey;
|
// pInfo->numOfRows data belong to the current session window
|
||||||
int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
|
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
|
||||||
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
|
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
|
||||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// pInfo->numOfRows data belong to the current session window
|
|
||||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
|
|
||||||
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
|
||||||
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
|
|
||||||
|
|
||||||
// here we start a new session window
|
// here we start a new session window
|
||||||
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
|
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
|
||||||
doKeepTuple(pRowSup, tsList[j], gid);
|
doKeepTuple(pRowSup, tsList[j], gid);
|
||||||
|
|
Loading…
Reference in New Issue