refactor: adjust the name of stream automatically created by sma
This commit is contained in:
parent
c0639d3dc4
commit
f0b8386b63
|
@ -125,7 +125,7 @@ typedef struct SMnode {
|
|||
} SMnode;
|
||||
|
||||
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
|
||||
int64_t mndGenerateUid(char *name, int32_t len);
|
||||
int64_t mndGenerateUid(const char *name, int32_t len);
|
||||
|
||||
int32_t mndAcquireRpcRef(SMnode *pMnode);
|
||||
void mndReleaseRpcRef(SMnode *pMnode);
|
||||
|
|
|
@ -624,7 +624,7 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
|
|||
}
|
||||
|
||||
// Note: uid 0 is reserved
|
||||
int64_t mndGenerateUid(char *name, int32_t len) {
|
||||
int64_t mndGenerateUid(const char *name, int32_t len) {
|
||||
int32_t hashval = MurmurHash3_32(name, len);
|
||||
do {
|
||||
int64_t us = taosGetTimestampUs();
|
||||
|
|
|
@ -479,7 +479,8 @@ static void mndDestroySmaObj(SSmaObj *pSmaObj) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb) {
|
||||
static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb,
|
||||
const char *streamName) {
|
||||
SSmaObj smaObj = {0};
|
||||
memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
memcpy(smaObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN);
|
||||
|
@ -520,12 +521,12 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
}
|
||||
|
||||
SStreamObj streamObj = {0};
|
||||
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||
tstrncpy(streamObj.name, streamName, TSDB_STREAM_FNAME_LEN);
|
||||
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
tstrncpy(streamObj.targetDb, streamObj.sourceDb, TSDB_DB_FNAME_LEN);
|
||||
streamObj.createTime = taosGetTimestampMs();
|
||||
streamObj.updateTime = streamObj.createTime;
|
||||
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||
streamObj.uid = mndGenerateUid(streamName, strlen(streamName));
|
||||
streamObj.sourceDbUid = pDb->uid;
|
||||
streamObj.targetDbUid = pDb->uid;
|
||||
streamObj.version = 1;
|
||||
|
@ -590,7 +591,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
if (pTrans == NULL) goto _OVER;
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
mndTransSetSerial(pTrans);
|
||||
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
|
||||
mDebug("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
|
||||
|
||||
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
||||
|
@ -638,6 +639,14 @@ static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void mndGetStreamNameFromSmaName(char *streamName, char *smaName) {
|
||||
SName n;
|
||||
tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
streamName[0] = '1';
|
||||
streamName[1] = '.';
|
||||
strcpy(streamName + 2, tNameGetTableName(&n));
|
||||
}
|
||||
|
||||
static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
int32_t code = -1;
|
||||
|
@ -663,9 +672,12 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
pStream = mndAcquireStream(pMnode, createReq.name);
|
||||
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
mndGetStreamNameFromSmaName(streamName, createReq.name);
|
||||
|
||||
pStream = mndAcquireStream(pMnode, streamName);
|
||||
if (pStream != NULL) {
|
||||
mError("sma:%s, failed to create since stream:%s already exist", createReq.name, createReq.name);
|
||||
mError("sma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
|
||||
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
|
||||
goto _OVER;
|
||||
}
|
||||
|
@ -692,7 +704,7 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb);
|
||||
code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb, streamName);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
_OVER:
|
||||
|
@ -789,7 +801,10 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
|
|||
mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
|
||||
SStreamObj *pStream = mndAcquireStream(pMnode, pSma->name);
|
||||
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
mndGetStreamNameFromSmaName(streamName, pSma->name);
|
||||
|
||||
SStreamObj *pStream = mndAcquireStream(pMnode, streamName);
|
||||
if (pStream == NULL || pStream->smaId != pSma->uid) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
goto _OVER;
|
||||
|
@ -838,7 +853,10 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
|
|||
pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
|
||||
if (pVgroup == NULL) goto _OVER;
|
||||
|
||||
SStreamObj *pStream = mndAcquireStream(pMnode, pSma->name);
|
||||
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
mndGetStreamNameFromSmaName(streamName, pSma->name);
|
||||
|
||||
SStreamObj *pStream = mndAcquireStream(pMnode, streamName);
|
||||
if (pStream != NULL && pStream->smaId == pSma->uid) {
|
||||
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
|
||||
|
|
Loading…
Reference in New Issue