diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index b9c0589dc5..a137086d21 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -51,12 +51,15 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { taskId = replaceStreamTaskId(pTask); } - pTask->pState = streamStateOpen(pMeta->path, pTask, false, -1, -1); - if (pTask->pState == NULL) { - tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId); - return -1; - } else { - tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); + // sink task does not need the pState + if (pTask->info.taskLevel != TASK_LEVEL__SINK) { + pTask->pState = streamStateOpen(pMeta->path, pTask, false, -1, -1); + if (pTask->pState == NULL) { + tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId); + return -1; + } else { + tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); + } } if (pTask->info.fillHistory) { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 87fb615d5f..d42a3b545a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -338,7 +338,7 @@ int32_t rebuildFromRemoteChkp_rsync(const char* key, char* chkptPath, int64_t ch if (taosIsDir(defaultPath)) { taosRemoveDir(defaultPath); taosMulMkDir(defaultPath); - stDebug("clear local default dir before download checkpoint data:%s succ", defaultPath); + stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath); } code = streamTaskDownloadCheckpointData(key, chkptPath); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 51a6542256..e0822f60e7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -250,7 +250,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) taosThreadMutexLock(&pMeta->backendMutex); void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key)); - if (ppBackend != NULL && *ppBackend != NULL) { + if ((ppBackend != NULL) && (*ppBackend != NULL)) { taskDbAddRef(*ppBackend); STaskDbWrapper* pBackend = *ppBackend; @@ -258,7 +258,6 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) pTask->pBackend = pBackend; taosThreadMutexUnlock(&pMeta->backendMutex); - stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend); return 0; }