fix(query): set the correct primary timestamp column for state window operator.
This commit is contained in:
parent
3438ed13f3
commit
231b0c6cb5
|
@ -551,6 +551,7 @@ typedef struct SStateWindowOperatorInfo {
|
||||||
int32_t colIndex; // start row index
|
int32_t colIndex; // start row index
|
||||||
bool hasKey;
|
bool hasKey;
|
||||||
SStateKeys stateKey;
|
SStateKeys stateKey;
|
||||||
|
int32_t tsSlotId; // primary timestamp column slot id
|
||||||
STimeWindowAggSupp twAggSup;
|
STimeWindowAggSupp twAggSup;
|
||||||
// bool reptScan;
|
// bool reptScan;
|
||||||
} SStateWindowOperatorInfo;
|
} SStateWindowOperatorInfo;
|
||||||
|
@ -688,7 +689,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
||||||
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,
|
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,
|
||||||
bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo,
|
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo,
|
||||||
|
|
|
@ -4977,13 +4977,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
||||||
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
|
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, .calTrigger = pSessionNode->window.triggerType};
|
||||||
.calTrigger = pSessionNode->window.triggerType};
|
|
||||||
|
|
||||||
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
|
||||||
|
|
||||||
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
|
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
|
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
||||||
|
|
||||||
pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
|
pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
|
||||||
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode;
|
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode;
|
||||||
|
@ -4999,7 +4998,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
|
|
||||||
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
|
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, pTaskInfo);
|
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
|
||||||
|
|
||||||
|
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) {
|
||||||
SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*)pPhyNode;
|
SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*)pPhyNode;
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
|
|
|
@ -809,7 +809,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
|
||||||
|
|
||||||
SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex);
|
SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex);
|
||||||
int64_t gid = pBlock->info.groupId;
|
int64_t gid = pBlock->info.groupId;
|
||||||
|
@ -818,9 +817,9 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
||||||
int32_t numOfOutput = pOperator->numOfOutput;
|
int32_t numOfOutput = pOperator->numOfOutput;
|
||||||
|
|
||||||
int16_t bytes = pStateColInfoData->info.bytes;
|
int16_t bytes = pStateColInfoData->info.bytes;
|
||||||
int16_t type = pStateColInfoData->info.type;
|
|
||||||
|
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
// todo set the correct primary timestamp column
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
|
||||||
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
|
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
|
||||||
|
|
||||||
SWindowRowsSup* pRowSup = &pInfo->winSup;
|
SWindowRowsSup* pRowSup = &pInfo->winSup;
|
||||||
|
@ -1368,7 +1367,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, STimeWindowAggSupp* pTwAggSup,
|
SSDataBlock* pResBlock, STimeWindowAggSupp* pTwAggSup, int32_t tsSlotId,
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SExecTaskInfo* pTaskInfo) {
|
||||||
SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
|
SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
@ -1383,18 +1382,18 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
|
||||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
||||||
|
|
||||||
pInfo->twAggSup = *pTwAggSup;
|
pInfo->twAggSup = *pTwAggSup;
|
||||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||||
|
|
||||||
pOperator->name = "StateWindowOperator";
|
pInfo->tsSlotId = tsSlotId;
|
||||||
|
pOperator->name = "StateWindowOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW;
|
||||||
pOperator->blockingOptr = true;
|
pOperator->blockingOptr = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->pExpr = pExpr;
|
pOperator->pExpr = pExpr;
|
||||||
pOperator->numOfOutput = numOfCols;
|
pOperator->numOfOutput = numOfCols;
|
||||||
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->info = pInfo;
|
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL,
|
||||||
destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||||
|
|
Loading…
Reference in New Issue