fix(stream): handle return value.

This commit is contained in:
Haojun Liao 2024-09-29 18:43:26 +08:00
parent 2f65886d01
commit a23e6c2ce9
1 changed files with 12 additions and 5 deletions

View File

@ -73,6 +73,10 @@ int32_t tqScanWal(STQ* pTq) {
}
static void doStartScanWal(void* param, void* tmrId) {
int32_t vgId = 0;
STQ* pTq = NULL;
int32_t code = 0;
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, pParam->metaId);
@ -82,19 +86,22 @@ static void doStartScanWal(void* param, void* tmrId) {
return;
}
int32_t vgId = pMeta->vgId;
STQ* pTq = pMeta->ahandle;
vgId = pMeta->vgId;
pTq = pMeta->ahandle;
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
pTq->pVnode->restored);
int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
if (code) {
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
}
taosReleaseRef(streamMetaId, pParam->metaId);
code = taosReleaseRef(streamMetaId, pParam->metaId);
if (code) {
tqError("vgId:% failed to release ref for streamMeta, rid:%" PRId64, vgId, pParam->metaId, tstrerror(code));
}
taosMemoryFree(pParam);
}