refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-01-26 10:22:16 +08:00
parent bc7fe1fe6a
commit 9b3b03caab
4 changed files with 29 additions and 32 deletions

View File

@ -110,15 +110,15 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, co
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
int32_t mndProcessStreamHb(SRpcMsg *pReq);
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t initStreamNodeList(SMnode *pMnode);
int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated);
int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated);
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
#ifdef __cplusplus

View File

@ -1577,21 +1577,6 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter);
}
static int32_t mndPersistStreamLog(STrans *pTrans, SStreamObj *pStream, int8_t status) {
taosWLockLatch(&pStream->lock);
pStream->status = status;
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
taosWUnLockLatch(&pStream->lock);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
return 0;
}
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
@ -1647,7 +1632,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
// if nodeUpdate happened, not send pause trans
if (mndPauseStreamTasks(pMnode, pTrans, pStream) < 0) {
if (mndStreamSetPauseAction(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
@ -1655,12 +1640,18 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
}
// pause stream
if (mndPersistStreamLog(pTrans, pStream, STREAM_STATUS__PAUSE) < 0) {
taosWLockLatch(&pStream->lock);
pStream->status = STREAM_STATUS__PAUSE;
if (mndPersistTransLog(pStream, pTrans,SDB_STATUS_READY) < 0) {
taosWUnLockLatch(&pStream->lock);
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
taosWUnLockLatch(&pStream->lock);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@ -1724,7 +1715,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
// resume all tasks
if (mndResumeStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
if (mndStreamSetResumeAction(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
@ -1732,12 +1723,17 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
}
// resume stream
if (mndPersistStreamLog(pTrans, pStream, STREAM_STATUS__NORMAL) < 0) {
taosWLockLatch(&pStream->lock);
pStream->status = STREAM_STATUS__NORMAL;
if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
taosWUnLockLatch(&pStream->lock);
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
taosWUnLockLatch(&pStream->lock);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@ -1862,7 +1858,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
pStream->name, pTrans->id);
int32_t code = createStreamUpdateTrans(pStream, pChangeInfo, pTrans);
int32_t code = mndStreamSetUpdateEpsetAction(pStream, pChangeInfo, pTrans);
// todo: not continue, drop all and retry again
if (code != TSDB_CODE_SUCCESS) {

View File

@ -256,7 +256,7 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status)
}
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode) {
int32_t retryCode) {
STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode};
return mndTransAppendRedoAction(pTrans, &action);
}
@ -354,8 +354,8 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
// todo extract method: traverse stream tasks
// build trans to update the epset
int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
mDebug("start to build stream:0x%" PRIx64 " tasks epset update", pStream->uid);
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);

View File

@ -143,7 +143,7 @@ int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t
}
}
static int32_t doResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
@ -200,14 +200,14 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
return num;
}
int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
int32_t size = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (doResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) {
if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) {
return -1;
}
@ -219,7 +219,7 @@ int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream
return 0;
}
static int32_t doPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
@ -250,7 +250,7 @@ static int32_t doPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pT
return 0;
}
int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SArray *tasks = pStream->tasks;
int32_t size = taosArrayGetSize(tasks);
@ -259,7 +259,8 @@ int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream)
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (doPauseStreamTask(pMnode, pTrans, pTask) < 0) {
if (doSetPauseAction(pMnode, pTrans, pTask) < 0) {
return -1;
}