add reset stream

This commit is contained in:
54liuyao 2024-12-27 15:11:19 +08:00
parent 7a6eb1d969
commit 0bf9fb1e31
13 changed files with 148 additions and 0 deletions

View File

@ -480,6 +480,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_STREAM_ANOMALY,
QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC,
QUERY_NODE_RESET_STREAM_STMT,
} ENodeType;
typedef struct {
@ -3913,6 +3914,15 @@ typedef struct {
int32_t tSerializeSMResumeStreamReq(void* buf, int32_t bufLen, const SMResumeStreamReq* pReq);
int32_t tDeserializeSMResumeStreamReq(void* buf, int32_t bufLen, SMResumeStreamReq* pReq);
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
int8_t igNotExists;
int8_t igUntreated;
} SMResetStreamReq;
int32_t tSerializeSMResetStreamReq(void* buf, int32_t bufLen, const SMResetStreamReq* pReq);
int32_t tDeserializeSMResetStreamReq(void* buf, int32_t bufLen, SMResetStreamReq* pReq);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN];

View File

@ -261,6 +261,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_DNODE_INFO, "update-dnode-info", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_AUDIT, "audit", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CONFIG, "init-config", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RESET_STREAM, "reset-stream", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8

View File

@ -595,6 +595,12 @@ typedef struct SResumeStreamStmt {
bool ignoreUntreated;
} SResumeStreamStmt;
typedef struct SResetStreamStmt {
ENodeType type;
char streamName[TSDB_TABLE_NAME_LEN];
bool ignoreNotExists;
} SResetStreamStmt;
typedef struct SCreateFunctionStmt {
ENodeType type;
bool orReplace;

View File

@ -12018,6 +12018,43 @@ _exit:
return code;
}
int32_t tSerializeSMResetStreamReq(void *buf, int32_t bufLen, const SMResetStreamReq *pReq) {
SEncoder encoder = {0};
int32_t code = 0;
int32_t lino;
int32_t tlen;
tEncoderInit(&encoder, buf, bufLen);
TAOS_CHECK_EXIT(tStartEncode(&encoder));
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->name));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
tEndEncode(&encoder);
_exit:
if (code) {
tlen = code;
} else {
tlen = encoder.pos;
}
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMResetStreamReq(void *buf, int32_t bufLen, SMResetStreamReq *pReq) {
SDecoder decoder = {0};
int32_t code = 0;
int32_t lino;
tDecoderInit(&decoder, buf, bufLen);
TAOS_CHECK_EXIT(tStartDecode(&decoder));
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->name));
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
tEndDecode(&decoder);
_exit:
tDecoderClear(&decoder);
return code;
}
int32_t tEncodeMqSubTopicEp(void **buf, const SMqSubTopicEp *pTopicEp) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopicEp->topic);

View File

@ -438,6 +438,7 @@ TDMT_STREAM_DROP = 1053
TDMT_STREAM_DROP_RSP = 1054
TDMT_STREAM_RETRIEVE_TRIGGER = 1055
TDMT_STREAM_RETRIEVE_TRIGGER_RSP = 1056
TDMT_MND_RESET_STREAM = 1057
TDMT_SYNC_TIMEOUT = 1537
TDMT_SYNC_TIMEOUT_RSP = 1538
TDMT_SYNC_TIMEOUT_ELECTION = 1539

View File

@ -182,6 +182,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TB_WITH_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RESET_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_ANAL_ALGO, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -52,6 +52,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq);
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
@ -130,6 +131,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_RESET_STREAM, mndProcessResetStreamReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
@ -1898,6 +1900,35 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
static int32_t mndProcessResetStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
int32_t code = 0;
if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
return code;
}
SMResetStreamReq resetReq = {0};
if (tDeserializeSMResetStreamReq(pReq->pCont, pReq->contLen, &resetReq) < 0) {
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
}
code = mndAcquireStream(pMnode, resetReq.name, &pStream);
if (pStream == NULL || code != 0) {
if (resetReq.igNotExists) {
mInfo("stream:%s, not exist, not pause stream", resetReq.name);
return 0;
} else {
mError("stream:%s not exist, failed to pause stream", resetReq.name);
TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
}
}
//todo(liao hao jun)
return TSDB_CODE_ACTION_IN_PROGRESS;
}
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;

View File

@ -193,6 +193,8 @@ const char* nodesNodeName(ENodeType type) {
return "PauseStreamStmt";
case QUERY_NODE_RESUME_STREAM_STMT:
return "ResumeStreamStmt";
case QUERY_NODE_RESET_STREAM_STMT:
return "ResetStreamStmt";
case QUERY_NODE_BALANCE_VGROUP_STMT:
return "BalanceVgroupStmt";
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
@ -7287,6 +7289,32 @@ static int32_t jsonToDropStreamStmt(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkResetStreamStmtStreamName = "StreamName";
static const char* jkResetStreamStmtIgnoreNotExists = "IgnoreNotExists";
static int32_t resetStreamStmtToJson(const void* pObj, SJson* pJson) {
const SResetStreamStmt* pNode = (const SResetStreamStmt*)pObj;
int32_t code = tjsonAddStringToObject(pJson, jkResetStreamStmtStreamName, pNode->streamName);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkResetStreamStmtIgnoreNotExists, pNode->ignoreNotExists);
}
return code;
}
static int32_t jsonToResetStreamStmt(const SJson* pJson, void* pObj) {
SResetStreamStmt* pNode = (SResetStreamStmt*)pObj;
int32_t code = tjsonGetStringValue(pJson, jkResetStreamStmtStreamName, pNode->streamName);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkResetStreamStmtIgnoreNotExists, &pNode->ignoreNotExists);
}
return code;
}
static const char* jkMergeVgroupStmtVgroupId1 = "VgroupId1";
static const char* jkMergeVgroupStmtVgroupId2 = "VgroupId2";

View File

@ -615,6 +615,9 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
case QUERY_NODE_RESUME_STREAM_STMT:
code = makeNode(type, sizeof(SResumeStreamStmt), &pNode);
break;
case QUERY_NODE_RESET_STREAM_STMT:
code = makeNode(type, sizeof(SResetStreamStmt), &pNode);
break;
case QUERY_NODE_BALANCE_VGROUP_STMT:
code = makeNode(type, sizeof(SBalanceVgroupStmt), &pNode);
break;
@ -1480,6 +1483,7 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_DROP_STREAM_STMT: // no pointer field
case QUERY_NODE_PAUSE_STREAM_STMT: // no pointer field
case QUERY_NODE_RESUME_STREAM_STMT: // no pointer field
case QUERY_NODE_RESET_STREAM_STMT: // no pointer field
case QUERY_NODE_BALANCE_VGROUP_STMT: // no pointer field
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: // no pointer field
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: // no pointer field

View File

@ -301,6 +301,7 @@ SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken
SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName);
SNode* createPauseStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName);
SNode* createResumeStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, bool ignoreUntreated, SToken* pStreamName);
SNode* createResetStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName);
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId);
SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId);
SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt);

View File

@ -783,6 +783,7 @@ cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
cmd ::= PAUSE STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createPauseStreamStmt(pCxt, A, &B); }
cmd ::= RESUME STREAM exists_opt(A) ignore_opt(C) stream_name(B). { pCxt->pRootNode = createResumeStreamStmt(pCxt, A, C, &B); }
cmd ::= RESET STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createResetStreamStmt(pCxt, A, &B); }
%type col_list_opt { SNodeList* }
%destructor col_list_opt { nodesDestroyList($$); }

View File

@ -3714,6 +3714,20 @@ _err:
return NULL;
}
SNode* createResetStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName) {
CHECK_PARSER_STATUS(pCxt);
CHECK_NAME(checkStreamName(pCxt, pStreamName));
SPauseStreamStmt* pStmt = NULL;
pCxt->errCode = nodesMakeNode(QUERY_NODE_RESET_STREAM_STMT, (SNode**)&pStmt);
CHECK_MAKE_NODE(pStmt);
COPY_STRING_FORM_ID_TOKEN(pStmt->streamName, pStreamName);
pStmt->ignoreNotExists = ignoreNotExists;
return (SNode*)pStmt;
_err:
return NULL;
}
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId) {
CHECK_PARSER_STATUS(pCxt);
SKillStmt* pStmt = NULL;

View File

@ -12346,6 +12346,16 @@ static int32_t translateResumeStream(STranslateContext* pCxt, SResumeStreamStmt*
return buildCmdMsg(pCxt, TDMT_MND_RESUME_STREAM, (FSerializeFunc)tSerializeSMResumeStreamReq, &req);
}
static int32_t translateResetStream(STranslateContext* pCxt, SResetStreamStmt* pStmt) {
SMResetStreamReq req = {0};
SName name;
int32_t code = tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->streamName, strlen(pStmt->streamName));
if (TSDB_CODE_SUCCESS != code) return code;
(void)tNameGetFullDbName(&name, req.name);
req.igNotExists = pStmt->ignoreNotExists;
return buildCmdMsg(pCxt, TDMT_MND_RESET_STREAM, (FSerializeFunc)tSerializeSMResetStreamReq, &req);
}
static int32_t validateCreateView(STranslateContext* pCxt, SCreateViewStmt* pStmt) {
if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery) && QUERY_NODE_SET_OPERATOR != nodeType(pStmt->pQuery)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type");
@ -13380,6 +13390,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_RESUME_STREAM_STMT:
code = translateResumeStream(pCxt, (SResumeStreamStmt*)pNode);
break;
case QUERY_NODE_RESET_STREAM_STMT:
code = translateResetStream(pCxt, (SResetStreamStmt*)pNode);
break;
case QUERY_NODE_CREATE_FUNCTION_STMT:
code = translateCreateFunction(pCxt, (SCreateFunctionStmt*)pNode);
break;