test(stream): lock the update procedure.
This commit is contained in:
parent
92045f5485
commit
2de0f6e971
|
@ -106,14 +106,15 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
return rsp.code;
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
// streamMetaWUnLock(pMeta);
|
||||
|
||||
// todo for test purpose
|
||||
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
|
||||
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||
streamTaskResetStatus(pTask);
|
||||
|
||||
// continue after lock the meta again
|
||||
streamMetaWLock(pMeta);
|
||||
// streamMetaWLock(pMeta);
|
||||
|
||||
SStreamTask** ppHTask = NULL;
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
|
@ -166,65 +167,17 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
streamMetaWUnLock(pMeta);
|
||||
} else {
|
||||
if (!restored) {
|
||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag",
|
||||
vgId);
|
||||
tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId);
|
||||
pMeta->startInfo.tasksWillRestart = 0;
|
||||
streamMetaWUnLock(pMeta);
|
||||
} else {
|
||||
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks);
|
||||
#if 1
|
||||
#if 0
|
||||
// for test purpose, to trigger the leader election
|
||||
taosMSleep(5000);
|
||||
#endif
|
||||
tqStreamTaskStartAsync(pMeta, cb, true);
|
||||
streamMetaWUnLock(pMeta);
|
||||
#else
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
// For debug purpose.
|
||||
// the following procedure consume many CPU resource, result in the re-election of leader
|
||||
// with high probability. So we employ it as a test case for the stream processing framework, with
|
||||
// checkpoint/restart/nodeUpdate etc.
|
||||
while (1) {
|
||||
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
|
||||
if (startVal == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
|
||||
taosMsleep(500);
|
||||
}
|
||||
|
||||
while (streamMetaTaskInTimer(pMeta)) {
|
||||
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
||||
taosMsleep(100);
|
||||
}
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
int32_t code = streamMetaReopen(pMeta);
|
||||
if (code != 0) {
|
||||
tqError("vgId:%d failed to reopen stream meta", vgId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
taosArrayDestroy(req.pNodeList);
|
||||
return -1;
|
||||
}
|
||||
|
||||
streamMetaInitBackend(pMeta);
|
||||
|
||||
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
|
||||
tqError("vgId:%d failed to load stream tasks", vgId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
taosArrayDestroy(req.pNodeList);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||
tqInfo("vgId:%d start all stream tasks after all being updated", vgId);
|
||||
tqStreamTaskResetStatus(pTq->pStreamMeta);
|
||||
tqStartStreamTaskAsync(pTq, false);
|
||||
} else {
|
||||
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
|
||||
}
|
||||
streamMetaWUnLock(pMeta);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue