fix: add stream checking

This commit is contained in:
dapan1121 2025-03-20 11:48:17 +08:00
parent f0f517db71
commit 274cd443ae
3 changed files with 20 additions and 5 deletions

View File

@ -317,7 +317,9 @@ typedef struct SCtgVSubTablesCtx {
int32_t vgNum;
bool clonedVgroups;
SArray* pVgroups;
int32_t resCode;
int32_t resDoneNum;
SVSubTablesRsp* pResList;
int32_t resIdx;
} SCtgVSubTablesCtx;

View File

@ -3151,13 +3151,20 @@ int32_t ctgHandleGetVSubTablesRsp(SCtgTaskReq* tReq, int32_t reqType, const SDat
SCtgTask* pTask = tReq->pTask;
int32_t newCode = TSDB_CODE_SUCCESS;
SCtgVSubTablesCtx* pCtx = (SCtgVSubTablesCtx*)pTask->taskCtx;
int32_t resIdx = atomic_fetch_add_32(&pCtx->resIdx, 1);
CTG_ERR_JRET(ctgProcessRspMsg(pCtx->pResList + atomic_fetch_add_32(&pCtx->resIdx, 1), reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
if (atomic_load_32(&pCtx->resIdx) < pCtx->vgNum) {
CTG_RET(code);
code = ctgProcessRspMsg(pCtx->pResList + resIdx, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target);
if (code) {
pCtx->resCode = code;
}
int32_t doneNum = atomic_add_fetch_32(&pCtx->resDoneNum, 1);
if (doneNum < pCtx->vgNum) {
return code;
}
code = pCtx->resCode;
_return:
newCode = ctgHandleTaskEnd(pTask, code);

View File

@ -12045,6 +12045,12 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt
}
if (isVirtualTable(tableType) || (tableType == TSDB_SUPER_TABLE && pMeta->virtualStb)) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if ((STREAM_TRIGGER_WINDOW_CLOSE != pStmt->pOptions->triggerType) &&
!(STREAM_TRIGGER_AT_ONCE == pStmt->pOptions->triggerType && (NULL == pSelect->pWindow && NULL == pSelect->pEvery))) {
taosMemoryFree(pMeta);
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Not supported virtual table stream query or trigger mode");
}
if (0 == pStmt->pOptions->ignoreExpired) {
taosMemoryFree(pMeta);
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "For virtual table IGNORE EXPIRED must be 1");