refactor: do some internal refactor.
This commit is contained in:
parent
42c11e9e36
commit
079f6358aa
|
@ -749,6 +749,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
int32_t sqlLen = 0;
|
int32_t sqlLen = 0;
|
||||||
const char *pMsg = "create stream tasks on dnodes";
|
const char *pMsg = "create stream tasks on dnodes";
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
SCMCreateStreamReq createReq = {0};
|
SCMCreateStreamReq createReq = {0};
|
||||||
|
@ -788,11 +789,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
if (createReq.sql != NULL) {
|
if (createReq.sql != NULL) {
|
||||||
sqlLen = strlen(createReq.sql);
|
sqlLen = strlen(createReq.sql);
|
||||||
sql = taosMemoryMalloc(sqlLen + 1);
|
sql = taosMemoryMalloc(sqlLen + 1);
|
||||||
if (sql == NULL) {
|
TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
|
||||||
code = terrno;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
memset(sql, 0, sqlLen + 1);
|
memset(sql, 0, sqlLen + 1);
|
||||||
memcpy(sql, createReq.sql, sqlLen);
|
memcpy(sql, createReq.sql, sqlLen);
|
||||||
}
|
}
|
||||||
|
|
|
@ -171,25 +171,24 @@ int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnCo
|
||||||
*pTrans1 = NULL;
|
*pTrans1 = NULL;
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
STrans *p = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
|
STrans *p = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
mError("failed to build trans:%s, reason: %s", name, tstrerror(terrno));
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, p->id);
|
mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, p->id);
|
||||||
|
|
||||||
mndTransSetDbName(p, pStream->sourceDb, pStream->targetSTbName);
|
mndTransSetDbName(p, pStream->sourceDb, pStream->targetSTbName);
|
||||||
if (mndTransCheckConflict(pMnode, p) != 0) {
|
if ((code = mndTransCheckConflict(pMnode, p)) != 0) {
|
||||||
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
|
||||||
mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
|
mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
|
||||||
mndTransDrop(p);
|
mndTransDrop(p);
|
||||||
return terrno;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
*pTrans1 = p;
|
*pTrans1 = p;
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
|
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
|
||||||
|
|
Loading…
Reference in New Issue