enh(stream): sink task does not initialize the backend.

This commit is contained in:
Haojun Liao 2024-05-28 11:34:04 +08:00
parent bd514fe4d3
commit cc318d7e2d
3 changed files with 11 additions and 9 deletions

View File

@ -51,12 +51,15 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
taskId = replaceStreamTaskId(pTask); taskId = replaceStreamTaskId(pTask);
} }
pTask->pState = streamStateOpen(pMeta->path, pTask, false, -1, -1); // sink task does not need the pState
if (pTask->pState == NULL) { if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId); pTask->pState = streamStateOpen(pMeta->path, pTask, false, -1, -1);
return -1; if (pTask->pState == NULL) {
} else { tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); return -1;
} else {
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
} }
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {

View File

@ -338,7 +338,7 @@ int32_t rebuildFromRemoteChkp_rsync(const char* key, char* chkptPath, int64_t ch
if (taosIsDir(defaultPath)) { if (taosIsDir(defaultPath)) {
taosRemoveDir(defaultPath); taosRemoveDir(defaultPath);
taosMulMkDir(defaultPath); taosMulMkDir(defaultPath);
stDebug("clear local default dir before download checkpoint data:%s succ", defaultPath); stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath);
} }
code = streamTaskDownloadCheckpointData(key, chkptPath); code = streamTaskDownloadCheckpointData(key, chkptPath);

View File

@ -250,7 +250,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
taosThreadMutexLock(&pMeta->backendMutex); taosThreadMutexLock(&pMeta->backendMutex);
void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key)); void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key));
if (ppBackend != NULL && *ppBackend != NULL) { if ((ppBackend != NULL) && (*ppBackend != NULL)) {
taskDbAddRef(*ppBackend); taskDbAddRef(*ppBackend);
STaskDbWrapper* pBackend = *ppBackend; STaskDbWrapper* pBackend = *ppBackend;
@ -258,7 +258,6 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
pTask->pBackend = pBackend; pTask->pBackend = pBackend;
taosThreadMutexUnlock(&pMeta->backendMutex); taosThreadMutexUnlock(&pMeta->backendMutex);
stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend); stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
return 0; return 0;
} }