refactor backend

This commit is contained in:
Yihao Deng 2024-07-04 12:13:17 +00:00
parent 92ab689c46
commit 0cdfae3a2c
2 changed files with 10 additions and 8 deletions

View File

@ -38,8 +38,7 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
// alloc
pReader = (SStreamStateReader*)taosMemoryCalloc(1, sizeof(SStreamStateReader));
if (pReader == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
@ -52,10 +51,9 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
SStreamSnapReader* pSnapReader = NULL;
if (streamSnapReaderOpen(meta, sver, chkpId, meta->path, &pSnapReader) == 0) {
if ((code = streamSnapReaderOpen(meta, sver, chkpId, meta->path, &pSnapReader)) == 0) {
pReader->complete = 1;
} else {
code = terrno;
taosMemoryFree(pReader);
goto _err;
}
@ -68,7 +66,7 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
_err:
tqError("vgId:%d, vnode %s snapshot reader failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
tstrerror(terrno));
tstrerror(code));
*ppReader = NULL;
return code;
}
@ -142,7 +140,7 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
if (taosMkDir(pTq->pStreamMeta->path) != 0) {
code = TAOS_SYSTEM_ERROR(errno);
tqError("vgId:%d, vnode %s snapshot writer failed to create directory %s since %s", TD_VID(pTq->pVnode),
STREAM_STATE_TRANSFER, pTq->pStreamMeta->path, tstrerror(terrno));
STREAM_STATE_TRANSFER, pTq->pStreamMeta->path, tstrerror(code));
goto _err;
}
@ -160,7 +158,7 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
_err:
tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
tstrerror(terrno));
tstrerror(code));
taosMemoryFree(pWriter);
*ppWriter = NULL;
return code;

View File

@ -398,6 +398,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
pMeta->bkdChkptMgt = bkdMgtCreate(tpath);
if (pMeta->bkdChkptMgt == NULL) {
goto _err;
}
taosThreadMutexInit(&pMeta->backendMutex, NULL);
return pMeta;
@ -413,9 +416,10 @@ _err:
if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt);
taosMemoryFree(pMeta);
stError("failed to open stream meta");
stError("failed to open stream meta, reason:%s", tstrerror(terrno));
return NULL;
}