enh(stream): check the existence for snode.
This commit is contained in:
parent
29ba8ffb2d
commit
6170bb5c1c
|
@ -133,6 +133,7 @@ int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgr
|
|||
|
||||
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj** pStream);
|
||||
bool mndStreamNodeIsUpdated(SMnode *pMnode);
|
||||
bool mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb);
|
||||
|
||||
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
|
||||
int32_t mndProcessStreamHb(SRpcMsg *pReq);
|
||||
|
|
|
@ -801,12 +801,23 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
if (createReq.sql != NULL) {
|
||||
sqlLen = strlen(createReq.sql);
|
||||
sql = taosMemoryMalloc(sqlLen + 1);
|
||||
sql = taosStrdup(createReq.sql);
|
||||
TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
|
||||
}
|
||||
|
||||
memset(sql, 0, sqlLen + 1);
|
||||
memcpy(sql, createReq.sql, sqlLen);
|
||||
SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
|
||||
if (pSourceDb == NULL) {
|
||||
code = terrno;
|
||||
mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
|
||||
tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
bool snodeCheckSucc = mndCheckForSnode(pMnode, pSourceDb);
|
||||
mndReleaseDb(pMnode, pSourceDb);
|
||||
if (!snodeCheckSucc) {
|
||||
code = TSDB_CODE_SNODE_NOT_DEPLOYED;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
// build stream obj from request
|
||||
|
|
|
@ -1497,6 +1497,30 @@ bool mndStreamNodeIsUpdated(SMnode *pMnode) {
|
|||
return updated;
|
||||
}
|
||||
|
||||
bool mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SSnodeObj *pObj = NULL;
|
||||
|
||||
if (pSrcDb->cfg.replications == 1) {
|
||||
return true;
|
||||
} else {
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pObj);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
return true;
|
||||
}
|
||||
|
||||
mError("snode not existed when trying to create stream in db with multiple replica");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t seed = 0;
|
||||
static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) {
|
||||
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
|
||||
|
|
|
@ -629,7 +629,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
|||
}
|
||||
break;
|
||||
case TDMT_STREAM_TASK_DEPLOY: {
|
||||
int32_t code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len);
|
||||
code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
goto _err;
|
||||
|
|
|
@ -56,7 +56,7 @@ void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void*
|
|||
}
|
||||
}
|
||||
|
||||
stDebug("vgId:%d start %s tmr succ", vgId, pMsg);
|
||||
stTrace("vgId:%d start %s tmr succ", vgId, pMsg);
|
||||
}
|
||||
|
||||
void streamTmrStop(tmr_h tmrId) {
|
||||
|
|
Loading…
Reference in New Issue