fix(stream): fix deadlock in transfer state.

This commit is contained in:
Haojun Liao 2023-11-02 18:07:36 +08:00
parent 3aed8c29ae
commit 39175dc03b
2 changed files with 6 additions and 13 deletions

View File

@ -329,6 +329,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// start the task state transfer procedure. // start the task state transfer procedure.
char* p = NULL; char* p = NULL;
streamTaskGetStatus(pStreamTask, &p); streamTaskGetStatus(pStreamTask, &p);
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// update the scan data range for source task. // update the scan data range for source task.
stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64 stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
@ -347,9 +348,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskReleaseState(pTask); streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask); streamTaskReloadState(pStreamTask);
// 3. resume the state of stream task, after this function, the stream task will run immidately. But it can not be // 3. resume the state of stream task, after this function, the stream task will run immediately.
// pause, since the pause allowed attribute is not set yet. streamTaskResume(pStreamTask);
streamTaskResume(pStreamTask); // todo refactor: use streamTaskResume.
stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
@ -357,12 +357,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 5. save to disk // 5. save to disk
streamMetaWLock(pMeta);
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL); pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
streamMetaWUnLock(pMeta);
// 7. pause allowed. // 6. pause allowed.
streamTaskEnablePause(pStreamTask);
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) { if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);

View File

@ -260,13 +260,9 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
} }
} }
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId)) == NULL) {
while (pMeta->streamBackend == NULL) { stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
taosMsleep(100); taosMsleep(100);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
if (pMeta->streamBackend == NULL) {
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
}
} }
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);