diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index fd2aa47ef2..7064e48641 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -329,6 +329,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // start the task state transfer procedure. char* p = NULL; streamTaskGetStatus(pStreamTask, &p); + if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { // update the scan data range for source task. stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64 @@ -347,9 +348,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); - // 3. resume the state of stream task, after this function, the stream task will run immidately. But it can not be - // pause, since the pause allowed attribute is not set yet. - streamTaskResume(pStreamTask); // todo refactor: use streamTaskResume. + // 3. resume the state of stream task, after this function, the stream task will run immediately. + streamTaskResume(pStreamTask); stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); @@ -357,12 +357,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 5. save to disk - streamMetaWLock(pMeta); pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL); - streamMetaWUnLock(pMeta); - // 7. pause allowed. - streamTaskEnablePause(pStreamTask); + // 6. pause allowed. if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) { SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 887e879934..76945f17a9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -260,13 +260,9 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { } } - pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); - while (pMeta->streamBackend == NULL) { + while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId)) == NULL) { + stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); taosMsleep(100); - pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); - if (pMeta->streamBackend == NULL) { - stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); - } } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);