feat: split session and state window on stable
This commit is contained in:
parent
bacd7b6057
commit
849cb8d1b3
|
@ -1656,10 +1656,9 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
|||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pBInfo->pRes;
|
||||
return pBInfo->pRes->info.rows > 0 ? pBInfo->pRes : NULL;
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
|
|
@ -2161,9 +2161,6 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) {
|
|||
if (COLUMN_TYPE_TAG == pCol->colType) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_COL);
|
||||
}
|
||||
if (TSDB_SUPER_TABLE == pCol->tableType) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TABLE);
|
||||
}
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
|
|
@ -176,10 +176,22 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
|
|||
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
||||
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
|
||||
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
|
||||
if (WINDOW_TYPE_STATE == pWindow->winType || (!streamQuery && WINDOW_TYPE_SESSION == pWindow->winType)) {
|
||||
return false;
|
||||
if (WINDOW_TYPE_INTERVAL == pWindow->winType) {
|
||||
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
||||
if (WINDOW_TYPE_STATE == pWindow->winType) {
|
||||
if (!streamQuery) {
|
||||
return stbSplHasMultiTbScan(streamQuery, pNode);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (WINDOW_TYPE_SESSION == pWindow->winType) {
|
||||
if (!streamQuery) {
|
||||
return stbSplHasMultiTbScan(streamQuery, pNode);
|
||||
} else {
|
||||
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
||||
}
|
||||
}
|
||||
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_SORT:
|
||||
return stbSplHasMultiTbScan(streamQuery, pNode);
|
||||
|
@ -477,11 +489,64 @@ static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo
|
|||
return code;
|
||||
}
|
||||
|
||||
static void splSetTableScanType(SLogicNode* pNode, EScanType scanType) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||
((SScanLogicNode*)pNode)->scanType = scanType;
|
||||
} else {
|
||||
if (1 == LIST_LENGTH(pNode->pChildren)) {
|
||||
splSetTableScanType((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), scanType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
SLogicNode* pWindow = pInfo->pSplitNode;
|
||||
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0);
|
||||
|
||||
SNodeList* pMergeKeys = NULL;
|
||||
int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, &pMergeKeys);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
||||
(SNode*)splCreateScanSubplan(pCxt, pChild, SPLIT_FLAG_STABLE_SPLIT));
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
splSetTableScanType(pChild, SCAN_TYPE_TABLE_MERGE);
|
||||
++(pCxt->groupId);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
||||
SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
|
||||
} else {
|
||||
nodesDestroyList(pMergeKeys);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
if (pCxt->pPlanCxt->streamQuery) {
|
||||
return stbSplSplitSessionForStream(pCxt, pInfo);
|
||||
} else {
|
||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitStateForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
if (pCxt->pPlanCxt->streamQuery) {
|
||||
return stbSplSplitStateForStream(pCxt, pInfo);
|
||||
} else {
|
||||
return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -511,6 +576,8 @@ static int32_t stbSplSplitWindowForMergeTable(SSplitContext* pCxt, SStableSplitI
|
|||
return stbSplSplitInterval(pCxt, pInfo);
|
||||
case WINDOW_TYPE_SESSION:
|
||||
return stbSplSplitSession(pCxt, pInfo);
|
||||
case WINDOW_TYPE_STATE:
|
||||
return stbSplSplitState(pCxt, pInfo);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -34,3 +34,13 @@ TEST_F(PlanSessionTest, selectFunc) {
|
|||
// select function along with the columns of select row, and with SESSION clause
|
||||
run("SELECT MAX(c1), c2 FROM t1 SESSION(ts, 10s)");
|
||||
}
|
||||
|
||||
TEST_F(PlanSessionTest, stable) {
|
||||
useDb("root", "test");
|
||||
|
||||
// select function for SESSION clause
|
||||
run("SELECT MAX(c1), MIN(c1) FROM st1 SESSION(ts, 10s)");
|
||||
// select function along with the columns of select row, and with SESSION clause
|
||||
run("SELECT MAX(c1), c2 FROM st1 SESSION(ts, 10s)");
|
||||
run("SELECT count(ts) FROM st1 PARTITION BY c1 SESSION(ts, 10s)");
|
||||
}
|
||||
|
|
|
@ -40,3 +40,12 @@ TEST_F(PlanStateTest, selectFunc) {
|
|||
// select function along with the columns of select row, and with STATE_WINDOW clause
|
||||
run("SELECT MAX(c1), c2 FROM t1 STATE_WINDOW(c3)");
|
||||
}
|
||||
|
||||
TEST_F(PlanStateTest, stable) {
|
||||
useDb("root", "test");
|
||||
|
||||
// select function for STATE_WINDOW clause
|
||||
run("SELECT MAX(c1), MIN(c1) FROM st1 STATE_WINDOW(c2)");
|
||||
// select function along with the columns of select row, and with STATE_WINDOW clause
|
||||
run("SELECT MAX(c1), c2 FROM st1 STATE_WINDOW(c2)");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue