fix(stream): fix deadlock in transfer state.

This commit is contained in:
Haojun Liao 2023-11-02 18:07:36 +08:00
parent 0a227e807f
commit 706f1e4744
2 changed files with 6 additions and 13 deletions

View File

@ -329,6 +329,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// start the task state transfer procedure.
char* p = NULL;
streamTaskGetStatus(pStreamTask, &p);
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// update the scan data range for source task.
stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
@ -347,9 +348,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask);
// 3. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
// pause, since the pause allowed attribute is not set yet.
streamTaskResume(pStreamTask); // todo refactor: use streamTaskResume.
// 3. resume the state of stream task, after this function, the stream task will run immediately.
streamTaskResume(pStreamTask);
stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
@ -357,12 +357,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 5. save to disk
streamMetaWLock(pMeta);
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
streamMetaWUnLock(pMeta);
// 7. pause allowed.
streamTaskEnablePause(pStreamTask);
// 6. pause allowed.
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);

View File

@ -260,13 +260,9 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
}
}
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
while (pMeta->streamBackend == NULL) {
taosMsleep(100);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
if (pMeta->streamBackend == NULL) {
while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId)) == NULL) {
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
}
taosMsleep(100);
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);