feat(stream): drop stream

This commit is contained in:
Liu Jicong 2022-06-16 10:29:30 +08:00
parent 69eb9daddb
commit 803249837a
2 changed files with 21 additions and 16 deletions

View File

@ -2008,7 +2008,7 @@ typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
uint64_t queryId;
int64_t useconds;
int64_t stime; // timestamp precision ms
int64_t stime; // timestamp precision ms
int64_t reqRid;
int32_t pid;
bool stableQuery;
@ -2257,8 +2257,8 @@ typedef struct {
} SMqVDeleteRsp;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
int64_t streamId;
char name[TSDB_STREAM_FNAME_LEN];
int8_t igNotExists;
} SMDropStreamTaskReq;
typedef struct {

View File

@ -278,8 +278,8 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
}
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
SStbObj *pStb = NULL;
SDbObj *pDb = NULL;
SStbObj *pStb = NULL;
SDbObj *pDb = NULL;
SMCreateStbReq createReq = {0};
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
@ -321,7 +321,6 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
goto _OVER;
}
if (mndCheckDbAuth(pMnode, user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}
@ -520,13 +519,19 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
/*SDbObj *pDb = NULL;*/
/*SUserObj *pUser = NULL;*/
SMDropStreamTaskReq dropStreamReq = *(SMDropStreamTaskReq *)pReq->pCont;
SMDropStreamTaskReq dropReq = *(SMDropStreamTaskReq *)pReq->pCont;
pStream = mndAcquireStream(pMnode, dropStreamReq.name);
pStream = mndAcquireStream(pMnode, dropReq.name);
if (pStream == NULL) {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1;
if (dropReq.igNotExists) {
mDebug("stream:%s, not exist, ignore not exist is set", dropReq.name);
code = 0;
goto DROP_STREAM_OVER;
} else {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1;
}
}
#if 0
@ -539,19 +544,19 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropStreamReq.name, terrstr());
return -1;
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
return code;
}
mDebug("trans:%d, used to drop stream:%s", pTrans->id, dropStreamReq.name);
mDebug("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name);
// drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", dropStreamReq.name, terrstr());
return -1;
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
return code;
}
DROP_STREAM_OVER:
return 0;
return code;
}
static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {