fix(stream): fix error in execute task on a tranferred node.

This commit is contained in:
Haojun Liao 2023-08-16 15:24:24 +08:00
parent 993ae84398
commit d63758a1c5
4 changed files with 56 additions and 16 deletions

View File

@ -638,6 +638,8 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
int32_t streamTaskStop(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId);

View File

@ -530,6 +530,7 @@ static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *p
static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
SVnode *pVnode = pFsm->data;
int32_t vgId = pVnode->config.vgId;
SyncIndex appliedIdx = -1;
do {
@ -541,7 +542,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
} else {
vInfo("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64
", applied-index:%" PRId64,
pVnode->config.vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
taosMsleep(10);
}
} while (true);
@ -550,14 +551,19 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
walApplyVer(pVnode->pWal, commitIdx);
pVnode->restored = true;
vInfo("vgId:%d, sync restore finished, start to restore stream tasks by replay wal", pVnode->config.vgId);
// start to restore all stream tasks
if (tsDisableStream) {
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", pVnode->config.vgId);
if (vnodeIsRoleLeader(pVnode)) {
vInfo("vgId:%d, sync restore finished, start to launch stream tasks", vgId);
// start to restore all stream tasks
if (tsDisableStream) {
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId);
} else {
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
tqCheckStreamStatus(pVnode->pTq);
}
} else {
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
tqCheckStreamStatus(pVnode->pTq);
vInfo("vgId:%d, sync restore finished, no launch stream tasks since not leader", vgId);
}
}

View File

@ -143,6 +143,10 @@ _err:
void streamMetaClose(SStreamMeta* pMeta) {
qDebug("start to close stream meta");
if (pMeta == NULL) {
return;
}
tdbAbort(pMeta->db, pMeta->txn);
tdbTbClose(pMeta->pTaskDb);
tdbTbClose(pMeta->pCheckpointDb);

View File

@ -405,11 +405,15 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre
}
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
char buf[512] = {0};
EPSET_TO_STR(pEpSet, buf);
int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
for(int32_t i = 0; i < numOfUpstream; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i);
if (pInfo->nodeId == nodeId) {
epsetAssign(&pInfo->epSet, pEpSet);
qDebug("s-task:0x%x update the upstreamInfo, nodeId:%d newEpset:%s", pTask->id.taskId, nodeId, buf);
break;
}
}
@ -426,6 +430,9 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
}
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
char buf[512] = {0};
EPSET_TO_STR(pEpSet, buf);
int8_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
@ -435,16 +442,16 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
if (pVgInfo->vgId == nodeId) {
pVgInfo->epSet = *pEpSet;
qDebug("s-task:0x%x update the dispatch info, nodeId:%d", pTask->id.taskId, nodeId);
epsetAssign(&pVgInfo->epSet, pEpSet);
qDebug("s-task:0x%x update the dispatch info, nodeId:%d newEpset:%s", pTask->id.taskId, nodeId, buf);
break;
}
}
} else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher;
if (pDispatcher->nodeId == nodeId) {
pDispatcher->epSet = *pEpSet;
qDebug("s-task:0x%x update the dispatch info, nodeId:%d", pTask->id.taskId, nodeId);
epsetAssign(&pDispatcher->epSet, pEpSet);
qDebug("s-task:0x%x update the dispatch info, nodeId:%d newEpSet:%s", pTask->id.taskId, nodeId, buf);
}
} else {
// do nothing
@ -486,7 +493,12 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask)
taosArrayClear(pTask->checkReqIds);
taosArrayClear(pTask->pRspMsgList);
// reset the upstream task stage info
streamTaskResetUpstreamStageInfo(pTask);
pTask->status.downstreamReady = 0;
// todo: handle the case when the task is in fill-history (step 1) phase
streamSetStatusNormal(pTask);
taosWLockLatch(&pTask->pMeta->lock);
@ -494,20 +506,22 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask)
streamMetaCommit(pTask->pMeta);
taosWUnLockLatch(&pTask->pMeta->lock);
// qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id,
// pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus));
qDebug("s-task:%s vgId:%d restart completed", pTask->id.idStr, vgId);
// 3. start to check the downstream status
if (startTask) {
streamTaskCheckDownstreamTasks(pTask);
}
return 0;
}
int32_t doUpdateEpsetInfo(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
char buf[512] = {0};
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
epsetAssign(&pTask->info.epSet, pEpSet);
EPSET_TO_STR(pEpSet, buf)
qDebug("s-task:0x%x (vgId:%d) epset is updated %s", pTask->id.taskId, nodeId, buf);
}
// check for the dispath info and the upstream task info
@ -527,7 +541,21 @@ int32_t doUpdateEpsetInfo(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
for(int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
doUpdateEpsetInfo(pTask, pInfo->nodeId, &pInfo->newEp);
doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp);
}
return 0;
}
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
return;
}
int32_t size = taosArrayGetSize(pTask->pUpstreamInfoList);
for(int32_t i = 0; i < size; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
pInfo->stage = -1;
}
qDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
}