Merge pull request #27389 from taosdata/fix/TS-5311

fix(query):set group id for the results of event and count operators
This commit is contained in:
dapan1121 2024-08-23 14:36:20 +08:00 committed by GitHub
commit 89198c820d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 175 additions and 15 deletions

View File

@ -32,6 +32,7 @@ typedef struct SCountWindowResult {
typedef struct SCountWindowSupp {
SArray* pWinStates;
int32_t stateIndex;
int32_t curStateIndex;
} SCountWindowSupp;
typedef struct SCountWindowOperatorInfo {
@ -45,6 +46,8 @@ typedef struct SCountWindowOperatorInfo {
int32_t windowCount;
int32_t windowSliding;
SCountWindowSupp countSup;
SSDataBlock* pPreDataBlock;
int32_t preStateIndex;
} SCountWindowOperatorInfo;
void destroyCountWindowOperatorInfo(void* param) {
@ -65,6 +68,7 @@ static void clearWinStateBuff(SCountWindowResult* pBuff) { pBuff->winRows = 0; }
static SCountWindowResult* getCountWinStateInfo(SCountWindowSupp* pCountSup) {
SCountWindowResult* pBuffInfo = taosArrayGet(pCountSup->pWinStates, pCountSup->stateIndex);
pCountSup->curStateIndex = pCountSup->stateIndex;
if (!pBuffInfo) {
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
@ -113,6 +117,19 @@ void doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
TSKEY* tsCols = (TSKEY*)pColInfoData->pData;
int32_t numOfBuff = taosArrayGetSize(pInfo->countSup.pWinStates);
if (numOfBuff == 0) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
pInfo->countSup.stateIndex = (pInfo->preStateIndex + 1) % numOfBuff;
int32_t newSize = pRes->info.rows + pBlock->info.rows / pInfo->windowSliding + 1;
if (newSize > pRes->info.capacity) {
code = blockDataEnsureCapacity(pRes, newSize);
QUERY_CHECK_CODE(code, lino, _end);
}
for (int32_t i = 0; i < pBlock->info.rows;) {
SCountWindowResult* pBuffInfo = NULL;
@ -132,14 +149,6 @@ void doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->pRow->win, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData, i, num,
pBlock->info.rows, pExprSup->numOfExprs);
if (pBuffInfo->winRows == pInfo->windowCount) {
doUpdateNumOfRows(pExprSup->pCtx, pInfo->pRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pInfo->pRow, pExprSup->pCtx, pRes,
pExprSup->rowEntryInfoOffset, pTaskInfo);
pRes->info.rows += pInfo->pRow->numOfRows;
clearWinStateBuff(pBuffInfo);
clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
}
if (pInfo->windowCount != pInfo->windowSliding) {
if (prevRows <= pInfo->windowSliding) {
if (pBuffInfo->winRows > pInfo->windowSliding) {
@ -151,9 +160,21 @@ void doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
step = 0;
}
}
if (pBuffInfo->winRows == pInfo->windowCount) {
doUpdateNumOfRows(pExprSup->pCtx, pInfo->pRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pInfo->pRow, pExprSup->pCtx, pRes,
pExprSup->rowEntryInfoOffset, pTaskInfo);
pRes->info.rows += pInfo->pRow->numOfRows;
clearWinStateBuff(pBuffInfo);
pInfo->preStateIndex = pInfo->countSup.curStateIndex;
clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
}
i += step;
}
code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
@ -163,11 +184,18 @@ _end:
}
static void buildCountResult(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, SExecTaskInfo* pTaskInfo,
SFilterInfo* pFilterInfo, SSDataBlock* pBlock) {
SFilterInfo* pFilterInfo, int32_t preStateIndex, SSDataBlock* pBlock) {
SResultRow* pResultRow = NULL;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
for (int32_t i = 0; i < taosArrayGetSize(pCountSup->pWinStates); i++) {
int32_t numOfBuff = taosArrayGetSize(pCountSup->pWinStates);
int32_t newSize = pBlock->info.rows + numOfBuff;
if (newSize > pBlock->info.capacity) {
code = blockDataEnsureCapacity(pBlock, newSize);
QUERY_CHECK_CODE(code, lino, _end);
}
pCountSup->stateIndex = (preStateIndex + 1) % numOfBuff;
for (int32_t i = 0; i < numOfBuff; i++) {
SCountWindowResult* pBuff = NULL;
code = setCountWindowOutputBuff(pExprSup, pCountSup, &pResultRow, &pBuff);
QUERY_CHECK_CODE(code, lino, _end);
@ -204,7 +232,14 @@ static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock**
blockDataCleanup(pRes);
while (1) {
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
SSDataBlock* pBlock = NULL;
if (pInfo->pPreDataBlock == NULL) {
pBlock = getNextBlockFromDownstream(pOperator, 0);
} else {
pBlock = pInfo->pPreDataBlock;
pInfo->pPreDataBlock = NULL;
}
if (pBlock == NULL) {
break;
}
@ -226,18 +261,26 @@ static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock**
if (pInfo->groupId == 0) {
pInfo->groupId = pBlock->info.id.groupId;
} else if (pInfo->groupId != pBlock->info.id.groupId) {
buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pOperator->exprSupp.pFilterInfo, pRes);
pInfo->pPreDataBlock = pBlock;
pRes->info.id.groupId = pInfo->groupId;
buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pOperator->exprSupp.pFilterInfo, pInfo->preStateIndex, pRes);
pInfo->groupId = pBlock->info.id.groupId;
if (pRes->info.rows > 0) {
(*ppRes) = pRes;
return code;
}
}
doCountWindowAggImpl(pOperator, pBlock);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
pRes->info.id.groupId = pInfo->groupId;
(*ppRes) = pRes;
return code;
}
}
buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pOperator->exprSupp.pFilterInfo, pRes);
pRes->info.id.groupId = pInfo->groupId;
buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pOperator->exprSupp.pFilterInfo, pInfo->preStateIndex, pRes);
_end:
if (code != TSDB_CODE_SUCCESS) {
@ -320,6 +363,8 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
}
pInfo->countSup.stateIndex = 0;
pInfo->pPreDataBlock = NULL;
pInfo->preStateIndex = 0;
code = filterInitFromNode((SNode*)pCountWindowNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
QUERY_CHECK_CODE(code, lino, _error);

View File

@ -36,6 +36,7 @@ typedef struct SEventWindowOperatorInfo {
SFilterInfo* pEndCondInfo;
bool inWindow;
SResultRow* pRow;
SSDataBlock* pPreDataBlock;
} SEventWindowOperatorInfo;
static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator);
@ -126,6 +127,7 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
QUERY_CHECK_CODE(code, lino, _error);
pInfo->tsSlotId = tsSlotId;
pInfo->pPreDataBlock = NULL;
setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
@ -199,7 +201,14 @@ static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock**
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
SSDataBlock* pBlock = NULL;
if (pInfo->pPreDataBlock == NULL) {
pBlock = getNextBlockFromDownstream(pOperator, 0);
} else {
pBlock = pInfo->pPreDataBlock;
pInfo->pPreDataBlock = NULL;
}
if (pBlock == NULL) {
break;
}
@ -224,7 +233,8 @@ static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock**
code = doFilter(pRes, pSup->pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
if (pRes->info.rows >= pOperator->resultInfo.threshold ||
(pRes->info.id.groupId != pInfo->groupId && pRes->info.rows > 0)) {
(*ppRes) = pRes;
return code;
}
@ -303,7 +313,10 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
// this is a new group, reset the info
pInfo->inWindow = false;
pInfo->groupId = gid;
pInfo->pPreDataBlock = pBlock;
goto _return;
}
pRes->info.id.groupId = pInfo->groupId;
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};

View File

@ -236,4 +236,54 @@ sql_error select count(*) from sta event_window start with f1 >0 end with f2 > 0
sql_error select count(*) from sta event_window start with f1 >0 end with f2 > 0 group by tbname;
sql_error select count(*) from sta event_window start with f1 >0 end with f2 > 0 fill(NULL);
print step2
print =============== create database
sql create database test2 vgroups 4;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(3,3,3);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sql insert into t1 values(1648791243003,4,2,3,3.1);
sql insert into t1 values(1648791253004,5,2,3,4.1);
sql insert into t2 values(1648791213000,1,2,3,1.0);
sql insert into t2 values(1648791223001,2,2,3,1.1);
sql insert into t2 values(1648791233002,3,2,3,2.1);
sql insert into t2 values(1648791243003,4,2,3,3.1);
sql insert into t2 values(1648791253004,5,2,3,4.1);
sql insert into t3 values(1648791213000,1,2,3,1.0);
sql insert into t3 values(1648791223001,2,2,3,1.1);
sql insert into t3 values(1648791233002,3,2,3,2.1);
sql insert into t3 values(1648791243003,4,2,3,3.1);
sql insert into t3 values(1648791253004,5,2,3,4.1);
$loop_count = 0
loop4:
sleep 300
sql select _wstart, count(*) c1, tbname from st partition by tbname event_window start with a > 0 end with b = 2 slimit 2 limit 2;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 4 then
print ======rows=$rows
goto loop4
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -101,5 +101,57 @@ sql_error select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_windo
sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(2);
sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(2147483647);
print step3
print =============== create database
sql create database test2 vgroups 4;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(3,3,3);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sql insert into t1 values(1648791243003,4,2,3,3.1);
sql insert into t1 values(1648791253004,5,2,3,4.1);
sql insert into t2 values(1648791213000,1,2,3,1.0);
sql insert into t2 values(1648791223001,2,2,3,1.1);
sql insert into t2 values(1648791233002,3,2,3,2.1);
sql insert into t2 values(1648791243003,4,2,3,3.1);
sql insert into t2 values(1648791253004,5,2,3,4.1);
sql insert into t3 values(1648791213000,1,2,3,1.0);
sql insert into t3 values(1648791223001,2,2,3,1.1);
sql insert into t3 values(1648791233002,3,2,3,2.1);
sql insert into t3 values(1648791243003,4,2,3,3.1);
sql insert into t3 values(1648791253004,5,2,3,4.1);
$loop_count = 0
loop4:
sleep 300
sql select _wstart, count(*) c1, tbname from st partition by tbname count_window(2) slimit 2 limit 2;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 4 then
print ======rows=$rows
goto loop4
endi
print query_count0 end
system sh/exec.sh -n dnode1 -s stop -x SIGINT