diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0b6a8b3f1b..6f9ff1ec97 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -480,6 +480,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_STREAM_ANOMALY, QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC, + QUERY_NODE_RESET_STREAM_STMT, } ENodeType; typedef struct { @@ -3913,6 +3914,15 @@ typedef struct { int32_t tSerializeSMResumeStreamReq(void* buf, int32_t bufLen, const SMResumeStreamReq* pReq); int32_t tDeserializeSMResumeStreamReq(void* buf, int32_t bufLen, SMResumeStreamReq* pReq); +typedef struct { + char name[TSDB_STREAM_FNAME_LEN]; + int8_t igNotExists; + int8_t igUntreated; +} SMResetStreamReq; + +int32_t tSerializeSMResetStreamReq(void* buf, int32_t bufLen, const SMResetStreamReq* pReq); +int32_t tDeserializeSMResetStreamReq(void* buf, int32_t bufLen, SMResetStreamReq* pReq); + typedef struct { char name[TSDB_TABLE_FNAME_LEN]; char stb[TSDB_TABLE_FNAME_LEN]; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 93bfe306b6..e325d42ecf 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -261,6 +261,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_DNODE_INFO, "update-dnode-info", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUDIT, "audit", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONFIG, "init-config", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_RESET_STREAM, "reset-stream", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8 diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 8eb30b8184..d8584711a5 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -595,6 +595,12 @@ typedef struct SResumeStreamStmt { bool ignoreUntreated; } SResumeStreamStmt; +typedef struct SResetStreamStmt { + ENodeType type; + char streamName[TSDB_TABLE_NAME_LEN]; + bool ignoreNotExists; +} SResetStreamStmt; + typedef struct SCreateFunctionStmt { ENodeType type; bool orReplace; diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index 2193c7983f..aae0c013f7 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -12018,6 +12018,43 @@ _exit: return code; } +int32_t tSerializeSMResetStreamReq(void *buf, int32_t bufLen, const SMResetStreamReq *pReq) { + SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; + tEncoderInit(&encoder, buf, bufLen); + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->name)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists)); + tEndEncode(&encoder); + +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMResetStreamReq(void *buf, int32_t bufLen, SMResetStreamReq *pReq) { + SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + + tDecoderInit(&decoder, buf, bufLen); + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->name)); + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists)); + tEndDecode(&decoder); + +_exit: + tDecoderClear(&decoder); + return code; +} + int32_t tEncodeMqSubTopicEp(void **buf, const SMqSubTopicEp *pTopicEp) { int32_t tlen = 0; tlen += taosEncodeString(buf, pTopicEp->topic); diff --git a/source/common/test/msgTypeTable.ini b/source/common/test/msgTypeTable.ini index 0325183d8b..e31f21fd27 100644 --- a/source/common/test/msgTypeTable.ini +++ b/source/common/test/msgTypeTable.ini @@ -438,6 +438,7 @@ TDMT_STREAM_DROP = 1053 TDMT_STREAM_DROP_RSP = 1054 TDMT_STREAM_RETRIEVE_TRIGGER = 1055 TDMT_STREAM_RETRIEVE_TRIGGER_RSP = 1056 +TDMT_MND_RESET_STREAM = 1057 TDMT_SYNC_TIMEOUT = 1537 TDMT_SYNC_TIMEOUT_RSP = 1538 TDMT_SYNC_TIMEOUT_ELECTION = 1539 diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 8e595f76c9..869d68ed94 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -182,6 +182,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TB_WITH_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_RESET_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_ANAL_ALGO, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3bee82e3e7..963f69de64 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -52,6 +52,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter); static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq); static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq); +static int32_t mndProcessResetStreamReq(SRpcMsg *pReq); static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger); static int32_t mndProcessNodeCheck(SRpcMsg *pReq); @@ -130,6 +131,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq); + mndSetMsgHandle(pMnode, TDMT_MND_RESET_STREAM, mndProcessResetStreamReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream); @@ -1898,6 +1900,35 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } +static int32_t mndProcessResetStreamReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SStreamObj *pStream = NULL; + int32_t code = 0; + + if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { + return code; + } + + SMResetStreamReq resetReq = {0}; + if (tDeserializeSMResetStreamReq(pReq->pCont, pReq->contLen, &resetReq) < 0) { + TAOS_RETURN(TSDB_CODE_INVALID_MSG); + } + + code = mndAcquireStream(pMnode, resetReq.name, &pStream); + if (pStream == NULL || code != 0) { + if (resetReq.igNotExists) { + mInfo("stream:%s, not exist, not pause stream", resetReq.name); + return 0; + } else { + mError("stream:%s not exist, failed to pause stream", resetReq.name); + TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST); + } + } + + //todo(liao hao jun) + return TSDB_CODE_ACTION_IN_PROGRESS; +} + static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 6d4d89607f..369e6de85b 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -193,6 +193,8 @@ const char* nodesNodeName(ENodeType type) { return "PauseStreamStmt"; case QUERY_NODE_RESUME_STREAM_STMT: return "ResumeStreamStmt"; + case QUERY_NODE_RESET_STREAM_STMT: + return "ResetStreamStmt"; case QUERY_NODE_BALANCE_VGROUP_STMT: return "BalanceVgroupStmt"; case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: @@ -7287,6 +7289,32 @@ static int32_t jsonToDropStreamStmt(const SJson* pJson, void* pObj) { return code; } +static const char* jkResetStreamStmtStreamName = "StreamName"; +static const char* jkResetStreamStmtIgnoreNotExists = "IgnoreNotExists"; + +static int32_t resetStreamStmtToJson(const void* pObj, SJson* pJson) { + const SResetStreamStmt* pNode = (const SResetStreamStmt*)pObj; + + int32_t code = tjsonAddStringToObject(pJson, jkResetStreamStmtStreamName, pNode->streamName); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkResetStreamStmtIgnoreNotExists, pNode->ignoreNotExists); + } + + return code; +} + +static int32_t jsonToResetStreamStmt(const SJson* pJson, void* pObj) { + SResetStreamStmt* pNode = (SResetStreamStmt*)pObj; + + int32_t code = tjsonGetStringValue(pJson, jkResetStreamStmtStreamName, pNode->streamName); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkResetStreamStmtIgnoreNotExists, &pNode->ignoreNotExists); + } + + return code; +} + + static const char* jkMergeVgroupStmtVgroupId1 = "VgroupId1"; static const char* jkMergeVgroupStmtVgroupId2 = "VgroupId2"; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 7beaeaa46c..5c22ff00a3 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -615,6 +615,9 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { case QUERY_NODE_RESUME_STREAM_STMT: code = makeNode(type, sizeof(SResumeStreamStmt), &pNode); break; + case QUERY_NODE_RESET_STREAM_STMT: + code = makeNode(type, sizeof(SResetStreamStmt), &pNode); + break; case QUERY_NODE_BALANCE_VGROUP_STMT: code = makeNode(type, sizeof(SBalanceVgroupStmt), &pNode); break; @@ -1480,6 +1483,7 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_DROP_STREAM_STMT: // no pointer field case QUERY_NODE_PAUSE_STREAM_STMT: // no pointer field case QUERY_NODE_RESUME_STREAM_STMT: // no pointer field + case QUERY_NODE_RESET_STREAM_STMT: // no pointer field case QUERY_NODE_BALANCE_VGROUP_STMT: // no pointer field case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: // no pointer field case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: // no pointer field diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index e69a3da4a9..c6d617fc3d 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -301,6 +301,7 @@ SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName); SNode* createPauseStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName); SNode* createResumeStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, bool ignoreUntreated, SToken* pStreamName); +SNode* createResetStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName); SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId); SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId); SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 63eb09d509..0354d3c590 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -783,6 +783,7 @@ cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); } cmd ::= PAUSE STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createPauseStreamStmt(pCxt, A, &B); } cmd ::= RESUME STREAM exists_opt(A) ignore_opt(C) stream_name(B). { pCxt->pRootNode = createResumeStreamStmt(pCxt, A, C, &B); } +cmd ::= RESET STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createResetStreamStmt(pCxt, A, &B); } %type col_list_opt { SNodeList* } %destructor col_list_opt { nodesDestroyList($$); } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index a13472620b..b78d6baede 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -3714,6 +3714,20 @@ _err: return NULL; } +SNode* createResetStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName) { + CHECK_PARSER_STATUS(pCxt); + CHECK_NAME(checkStreamName(pCxt, pStreamName)); + SPauseStreamStmt* pStmt = NULL; + pCxt->errCode = nodesMakeNode(QUERY_NODE_RESET_STREAM_STMT, (SNode**)&pStmt); + CHECK_MAKE_NODE(pStmt); + COPY_STRING_FORM_ID_TOKEN(pStmt->streamName, pStreamName); + pStmt->ignoreNotExists = ignoreNotExists; + return (SNode*)pStmt; +_err: + return NULL; +} + + SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId) { CHECK_PARSER_STATUS(pCxt); SKillStmt* pStmt = NULL; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f191080512..951fabb0f6 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -12346,6 +12346,16 @@ static int32_t translateResumeStream(STranslateContext* pCxt, SResumeStreamStmt* return buildCmdMsg(pCxt, TDMT_MND_RESUME_STREAM, (FSerializeFunc)tSerializeSMResumeStreamReq, &req); } +static int32_t translateResetStream(STranslateContext* pCxt, SResetStreamStmt* pStmt) { + SMResetStreamReq req = {0}; + SName name; + int32_t code = tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->streamName, strlen(pStmt->streamName)); + if (TSDB_CODE_SUCCESS != code) return code; + (void)tNameGetFullDbName(&name, req.name); + req.igNotExists = pStmt->ignoreNotExists; + return buildCmdMsg(pCxt, TDMT_MND_RESET_STREAM, (FSerializeFunc)tSerializeSMResetStreamReq, &req); +} + static int32_t validateCreateView(STranslateContext* pCxt, SCreateViewStmt* pStmt) { if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery) && QUERY_NODE_SET_OPERATOR != nodeType(pStmt->pQuery)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type"); @@ -13380,6 +13390,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { case QUERY_NODE_RESUME_STREAM_STMT: code = translateResumeStream(pCxt, (SResumeStreamStmt*)pNode); break; + case QUERY_NODE_RESET_STREAM_STMT: + code = translateResetStream(pCxt, (SResetStreamStmt*)pNode); + break; case QUERY_NODE_CREATE_FUNCTION_STMT: code = translateCreateFunction(pCxt, (SCreateFunctionStmt*)pNode); break;