From fdf977f9355c658e2a5670ac20e46ffd68c1590c Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 6 Aug 2022 11:30:06 +0800 Subject: [PATCH 1/2] fix: add checks for stream query --- source/libs/parser/src/parTranslater.c | 56 ++++++++++++++++++-------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index db14a4f1c3..280e84990c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1254,19 +1254,20 @@ static int32_t translateRepeatScanFunc(STranslateContext* pCxt, SFunctionNode* p return TSDB_CODE_SUCCESS; } if (isSelectStmt(pCxt->pCurrStmt)) { - // select percentile() without from clause is also valid - if (NULL == ((SSelectStmt*)pCxt->pCurrStmt)->pFromTable) { - return TSDB_CODE_SUCCESS; - } - SNode* pTable = ((SSelectStmt*)pCxt->pCurrStmt)->pFromTable; - if (QUERY_NODE_REAL_TABLE == nodeType(pTable) && - (TSDB_CHILD_TABLE == ((SRealTableNode*)pTable)->pMeta->tableType || - TSDB_NORMAL_TABLE == ((SRealTableNode*)pTable)->pMeta->tableType)) { - return TSDB_CODE_SUCCESS; - } + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE, + "%s is only supported in single table query", pFunc->functionName); } - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE, - "%s is only supported in single table query", pFunc->functionName); + SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt; + SNode* pTable = pSelect->pFromTable; + // select percentile() without from clause is also valid + if ((NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) || + (TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType && + TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType))) || + NULL != pSelect->pPartitionByList) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE, + "%s is only supported in single table query", pFunc->functionName); + } + return TSDB_CODE_SUCCESS; } static bool isStar(SNode* pNode) { @@ -2509,9 +2510,31 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } -static int32_t translateStateWindow(STranslateContext* pCxt, SStateWindowNode* pState) { +static bool isPartitionByTbname(SNodeList* pPartitionByList) { + if (1 != LIST_LENGTH(pPartitionByList)) { + return false; + } + SNode* pPartKey = nodesListGetNode(pPartitionByList, 0); + return QUERY_NODE_FUNCTION != nodeType(pPartKey) || FUNCTION_TYPE_TBNAME != ((SFunctionNode*)pPartKey)->funcType; +} + +static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* pSelect) { + if (!pCxt->createStream) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && + !isPartitionByTbname(pSelect->pPartitionByList)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); + } + return TSDB_CODE_SUCCESS; +} + +static int32_t translateStateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { + SStateWindowNode* pState = (SStateWindowNode*)pSelect->pWindow; nodesWalkExprPostOrder(pState->pExpr, checkStateExpr, pCxt); - // todo check for "function not support for state_window" + if (TSDB_CODE_SUCCESS == pCxt->errCode) { + pCxt->errCode = checkStateWindowForStream(pCxt, pSelect); + } return pCxt->errCode; } @@ -2522,14 +2545,13 @@ static int32_t translateSessionWindow(STranslateContext* pCxt, SSessionWindowNod if (PRIMARYKEY_TIMESTAMP_COL_ID != pSession->pCol->colId) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SESSION_COL); } - // todo check for "function not support for session" return TSDB_CODE_SUCCESS; } static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { switch (nodeType(pSelect->pWindow)) { case QUERY_NODE_STATE_WINDOW: - return translateStateWindow(pCxt, (SStateWindowNode*)pSelect->pWindow); + return translateStateWindow(pCxt, pSelect); case QUERY_NODE_SESSION_WINDOW: return translateSessionWindow(pCxt, (SSessionWindowNode*)pSelect->pWindow); case QUERY_NODE_INTERVAL_WINDOW: @@ -4708,7 +4730,7 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt } } - return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); } static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) { From 5750525f9a66e5f5441a465abd094529615e987d Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 6 Aug 2022 15:25:53 +0800 Subject: [PATCH 2/2] fix: condition race error --- source/libs/parser/src/parTranslater.c | 2 +- source/libs/planner/src/planner.c | 7 +++++-- source/libs/scheduler/src/schTask.c | 3 ++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 280e84990c..9971f20d3d 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1253,7 +1253,7 @@ static int32_t translateRepeatScanFunc(STranslateContext* pCxt, SFunctionNode* p if (!fmIsRepeatScanFunc(pFunc->funcId)) { return TSDB_CODE_SUCCESS; } - if (isSelectStmt(pCxt->pCurrStmt)) { + if (!isSelectStmt(pCxt->pCurrStmt)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE, "%s is only supported in single table query", pFunc->functionName); } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 1f531b0708..c1296982e0 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -85,7 +85,7 @@ static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDown } int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) { - planDebug("QID:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.groupId, groupId); + planDebug("QID:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.queryId, groupId); return setSubplanExecutionNode(subplan->pNode, groupId, pSource); } @@ -104,7 +104,10 @@ static void clearSubplanExecutionNode(SPhysiNode* pNode) { FOREACH(pChild, pNode->pChildren) { clearSubplanExecutionNode((SPhysiNode*)pChild); } } -void qClearSubplanExecutionNode(SSubplan* pSubplan) { clearSubplanExecutionNode(pSubplan->pNode); } +void qClearSubplanExecutionNode(SSubplan* pSubplan) { + planDebug("QID:0x%" PRIx64 " clear subplan execution node, groupId:%d", pSubplan->id.queryId, pSubplan->id.groupId); + clearSubplanExecutionNode(pSubplan->pNode); +} int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) { if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index ecd0253e1c..978a72cf34 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -284,7 +284,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { for (int32_t i = 0; i < parentNum; ++i) { SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i); - int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1); SCH_LOCK(SCH_WRITE, &parent->planLock); SDownstreamSourceNode source = { @@ -298,6 +297,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); SCH_UNLOCK(SCH_WRITE, &parent->planLock); + int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1); + if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) { SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId); SCH_ERR_RET(schLaunchTask(pJob, parent));