fix(stream): fix race condition.
This commit is contained in:
parent
fc96ec6bae
commit
68b003c804
|
@ -405,7 +405,7 @@ typedef struct SStreamMeta {
|
|||
int32_t vgId;
|
||||
int64_t stage;
|
||||
bool leader;
|
||||
int8_t taskWillbeLaunched;
|
||||
int8_t taskStartedByNodeUpdate;
|
||||
SRWLatch lock;
|
||||
int32_t walScanCounter;
|
||||
void* streamBackend;
|
||||
|
|
|
@ -1830,15 +1830,16 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskSet);
|
||||
if (updateTasks < numOfTasks) {
|
||||
pMeta->taskWillbeLaunched = 1;
|
||||
|
||||
tqDebug("vgId:%d closed tasks:%d, unclosed:%d", vgId, updateTasks, (numOfTasks - updateTasks));
|
||||
pMeta->taskStartedByNodeUpdate = 1;
|
||||
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
||||
updateTasks, (numOfTasks - updateTasks));
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
} else {
|
||||
taosHashClear(pMeta->pUpdateTaskSet);
|
||||
|
||||
if (!pTq->pVnode->restored) {
|
||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId);
|
||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
||||
pMeta->taskStartedByNodeUpdate = 0;
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
} else {
|
||||
tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId);
|
||||
|
@ -1860,14 +1861,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||
vInfo("vgId:%d, restart all stream tasks", vgId);
|
||||
vInfo("vgId:%d restart all stream tasks", vgId);
|
||||
tqStartStreamTasks(pTq);
|
||||
tqCheckAndRunStreamTaskAsync(pTq);
|
||||
} else {
|
||||
vInfo("vgId:%d, follower node not start stream tasks", vgId);
|
||||
}
|
||||
|
||||
pMeta->taskWillbeLaunched = 0;
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -551,13 +551,8 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
|||
walApplyVer(pVnode->pWal, commitIdx);
|
||||
pVnode->restored = true;
|
||||
|
||||
if (pVnode->pTq->pStreamMeta->taskWillbeLaunched) {
|
||||
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
|
||||
return;
|
||||
}
|
||||
|
||||
taosWLockLatch(&pVnode->pTq->pStreamMeta->lock);
|
||||
if (pVnode->pTq->pStreamMeta->taskWillbeLaunched) {
|
||||
if (pVnode->pTq->pStreamMeta->taskStartedByNodeUpdate) {
|
||||
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
|
||||
taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock);
|
||||
return;
|
||||
|
|
|
@ -118,8 +118,8 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
|
|||
|
||||
// check status
|
||||
static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
|
||||
SDataRange* pRange = &pTask->dataRange;
|
||||
STimeWindow* pWindow = &pRange->window;
|
||||
SDataRange* pRange = &pTask->dataRange;
|
||||
STimeWindow* pWindow = &pRange->window;
|
||||
|
||||
SStreamTaskCheckReq req = {
|
||||
.streamId = pTask->id.streamId,
|
||||
|
@ -136,10 +136,10 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
|
|||
req.downstreamTaskId = pTask->fixedDispatcher.taskId;
|
||||
pTask->checkReqId = req.reqId;
|
||||
|
||||
stDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64
|
||||
"-%" PRId64 ", stage:%"PRId64" req:0x%" PRIx64,
|
||||
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer,
|
||||
pWindow->skey, pWindow->ekey, req.stage, req.reqId);
|
||||
stDebug("s-task:%s stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
|
||||
" window:%" PRId64 "-%" PRId64 " req:0x%" PRIx64,
|
||||
pTask->id.idStr, req.reqId, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer,
|
||||
pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
|
||||
|
||||
streamSendCheckMsg(pTask, &req, pTask->fixedDispatcher.nodeId, &pTask->fixedDispatcher.epSet);
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
|
@ -158,8 +158,8 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
|
|||
taosArrayPush(pTask->checkReqIds, &req.reqId);
|
||||
req.downstreamNodeId = pVgInfo->vgId;
|
||||
req.downstreamTaskId = pVgInfo->taskId;
|
||||
stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, stage:%" PRId64,
|
||||
pTask->id.idStr, pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, i, req.stage);
|
||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 "check downstream task:0x%x (vgId:%d) (shuffle), idx:%d",
|
||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
|
||||
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||
}
|
||||
} else {
|
||||
|
@ -907,6 +907,12 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
|
|||
stDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
|
||||
return;
|
||||
}
|
||||
//
|
||||
// if (pTask->status.downstreamReady == 0) {
|
||||
// ASSERT(pTask->execInfo.start == 0);
|
||||
// stDebug("s-task:%s in check downstream procedure, abort and paused", pTask->id.idStr);
|
||||
// break;
|
||||
// }
|
||||
|
||||
const char* pStatus = streamGetTaskStatusStr(status);
|
||||
stDebug("s-task:%s wait for the task can be paused, status:%s, vgId:%d", pTask->id.idStr, pStatus, pMeta->vgId);
|
||||
|
|
Loading…
Reference in New Issue