refactor(stream): create sim env for stream processing.

This commit is contained in:
Haojun Liao 2023-11-08 11:04:16 +08:00
parent a0d81734f1
commit 809dd064a7
1 changed files with 38 additions and 0 deletions

View File

@ -1920,7 +1920,45 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
streamMetaWUnLock(pMeta);
} else {
streamMetaWUnLock(pMeta);
#if 0
tqStartStreamTaskAsync(pTq, true);
#else
// For debug purpose.
// the following procedure consume many CPU resource, result in the re-election of leader
// with high probability. So we employ it as a test case for the stream processing framework, with
// checkpoint/restart/nodeUpdate etc.
while (streamMetaTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
int32_t code = streamMetaReopen(pMeta);
if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
tqResetStreamTaskStatus(pTq);
tqStartStreamTaskAsync(pTq, false);
} else {
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
}
streamMetaWUnLock(pMeta);
#endif
}
}