diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index a475cca52c..235cbbc5d5 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -317,7 +317,9 @@ typedef struct SCtgVSubTablesCtx { int32_t vgNum; bool clonedVgroups; SArray* pVgroups; - + + int32_t resCode; + int32_t resDoneNum; SVSubTablesRsp* pResList; int32_t resIdx; } SCtgVSubTablesCtx; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index cf60437508..e80d4611e9 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -3151,13 +3151,20 @@ int32_t ctgHandleGetVSubTablesRsp(SCtgTaskReq* tReq, int32_t reqType, const SDat SCtgTask* pTask = tReq->pTask; int32_t newCode = TSDB_CODE_SUCCESS; SCtgVSubTablesCtx* pCtx = (SCtgVSubTablesCtx*)pTask->taskCtx; + int32_t resIdx = atomic_fetch_add_32(&pCtx->resIdx, 1); - CTG_ERR_JRET(ctgProcessRspMsg(pCtx->pResList + atomic_fetch_add_32(&pCtx->resIdx, 1), reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); - - if (atomic_load_32(&pCtx->resIdx) < pCtx->vgNum) { - CTG_RET(code); + code = ctgProcessRspMsg(pCtx->pResList + resIdx, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target); + if (code) { + pCtx->resCode = code; } + int32_t doneNum = atomic_add_fetch_32(&pCtx->resDoneNum, 1); + if (doneNum < pCtx->vgNum) { + return code; + } + + code = pCtx->resCode; + _return: newCode = ctgHandleTaskEnd(pTask, code); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 75e01d1356..e1f7160d46 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -12045,6 +12045,12 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt } if (isVirtualTable(tableType) || (tableType == TSDB_SUPER_TABLE && pMeta->virtualStb)) { + SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; + if ((STREAM_TRIGGER_WINDOW_CLOSE != pStmt->pOptions->triggerType) && + !(STREAM_TRIGGER_AT_ONCE == pStmt->pOptions->triggerType && (NULL == pSelect->pWindow && NULL == pSelect->pEvery))) { + taosMemoryFree(pMeta); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Not supported virtual table stream query or trigger mode"); + } if (0 == pStmt->pOptions->ignoreExpired) { taosMemoryFree(pMeta); return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "For virtual table IGNORE EXPIRED must be 1");