diff --git a/include/util/taoserror.h b/include/util/taoserror.h index c75425425b..5aafdac3cd 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -359,6 +359,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_STREAM_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x03F4) #define TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB TAOS_DEF_ERROR_CODE(0, 0x03F5) #define TSDB_CODE_MND_TOO_MANY_STREAMS TAOS_DEF_ERROR_CODE(0, 0x03F6) +#define TSDB_CODE_MND_INVALID_TARGET_TABLE TAOS_DEF_ERROR_CODE(0, 0x03F7) // mnode-sma #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index bf9168ac8f..99327d2776 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -696,6 +696,12 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; goto _OVER; } + + if (pStream->targetStbUid == streamObj.targetStbUid) { + mError("Cannot write the same stable as other stream:%s", pStream->name); + terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE; + goto _OVER; + } } } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index a34836959c..ae33da66b9 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -611,6 +611,13 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p tqUpdateTbUidList(pVnode->pTq, tbUids, false); } + // process + ret = tsdbDoRetention(pVnode->pTsdb, ttlReq.timestamp); + if (ret) goto end; + + ret = smaDoRetention(pVnode->pSma, ttlReq.timestamp); + if (ret) goto end; + end: taosArrayDestroy(tbUids); return ret; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index af247b6e65..fc4669785c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2984,11 +2984,14 @@ static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode* return TSDB_CODE_SUCCESS; } - if (!pCxt->createStream && (TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_INITIALIZER) || - TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_DESC_INITIALIZER))) { + if (!pCxt->createStream && TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_INITIALIZER)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); } + if (TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_DESC_INITIALIZER)) { + return TSDB_CODE_SUCCESS; + } + // interp FILL clause if (NULL == pInterval) { return TSDB_CODE_SUCCESS; @@ -6126,11 +6129,15 @@ static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStream pReq->createStb = STREAM_CREATE_STABLE_TRUE; pReq->targetStbUid = 0; return TSDB_CODE_SUCCESS; - } else { + } else if (TSDB_CODE_SUCCESS == code) { if (isTagDef(pStmt->pTags)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Table already exist: %s", pStmt->targetTabName); } + if (TSDB_SUPER_TABLE != (*pMeta)->tableType) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Stream can only be written to super table"); + } pReq->createStb = STREAM_CREATE_STABLE_FALSE; pReq->targetStbUid = (*pMeta)->suid; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index fecdb4d32a..bd07fb3728 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -291,6 +291,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_MUST_BE_DELETED, "Stream must be dropped first") TAOS_DEFINE_ERROR(TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB, "Stream temporarily does not support source db having replica > 1") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STREAMS, "Too many streams") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TARGET_TABLE, "Cannot write the same stable as other stream") // mnode-sma TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")