refactor(stream): injection error.

This commit is contained in:
Haojun Liao 2025-01-18 01:26:49 +08:00
parent 6274eea7c4
commit c93fe999ba
4 changed files with 17 additions and 2 deletions

View File

@ -255,6 +255,9 @@ int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);
// injection error
void streamMetaFreeTQDuringScanWalError(STQ* pTq);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
// tq-mq

View File

@ -161,8 +161,12 @@ void tqClose(STQ* pTq) {
taosHashCleanup(pTq->pOffset);
taosMemoryFree(pTq->path);
tqMetaClose(pTq);
qDebug("vgId:%d end to close tq", pTq->pStreamMeta != NULL ? pTq->pStreamMeta->vgId : -1);
#if 0
streamMetaFreeTQDuringScanWalError(pTq);
#endif
qDebug("vgId:%d end to close tq", vgId);
taosMemoryFree(pTq);
}

View File

@ -462,3 +462,11 @@ int32_t doScanWalAsync(STQ* pTq, bool ckPause) {
return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
}
void streamMetaFreeTQDuringScanWalError(STQ* pTq) {
SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam));
p->metaId = pTq->pStreamMeta->rid;
p->numOfTasks = 0;
doStartScanWal(p, 0);
}

View File

@ -14,4 +14,4 @@ void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId)
// the checkpoint interval should be 60s, and the next checkpoint req should be issued by mnode
taosMsleep(65*1000);
}
}