fix(stream): set the dest epset from mnode, and set the retry error code for trans.

This commit is contained in:
Haojun Liao 2024-03-05 13:28:34 +08:00
parent 2ac803bf3c
commit cdfbbc2985
4 changed files with 15 additions and 7 deletions

View File

@ -111,7 +111,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);

View File

@ -1811,7 +1811,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid, mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
pStream->name, pTrans->id); pStream->name, pTrans->id);
int32_t code = mndStreamSetUpdateEpsetAction(pStream, pChangeInfo, pTrans); int32_t code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
// todo: not continue, drop all and retry again // todo: not continue, drop all and retry again
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -462,14 +462,22 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t doSetUpdateTaskAction(STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) { static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
void *pBuf = NULL; void *pBuf = NULL;
int32_t len = 0; int32_t len = 0;
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0); SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
terrno = code;
return code;
}
code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
} }
@ -478,14 +486,14 @@ static int32_t doSetUpdateTaskAction(STrans *pTrans, SStreamTask *pTask, SVgroup
} }
// build trans to update the epset // build trans to update the epset
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid); mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
taosWLockLatch(&pStream->lock); taosWLockLatch(&pStream->lock);
SStreamTaskIter *pIter = createStreamTaskIter(pStream); SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) { while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter); SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
int32_t code = doSetUpdateTaskAction(pTrans, pTask, pInfo); int32_t code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);

View File

@ -39,7 +39,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf); stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
} }
// check for the dispath info and the upstream task info // check for the dispatch info and the upstream task info
int32_t level = pTask->info.taskLevel; int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) { if (level == TASK_LEVEL__SOURCE) {
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);