fix(stream): scan wal data files.

This commit is contained in:
Haojun Liao 2024-01-03 18:16:18 +08:00
parent 185fa75376
commit 4138692cea
2 changed files with 5 additions and 4 deletions

View File

@ -1066,15 +1066,15 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
if (taskId == STREAM_EXEC_T_EXTRACT_WAL_DATA) { // all tasks are extracted submit data from the wal
if (pReq->reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) { // all tasks are extracted submit data from the wal
tqScanWal(pTq);
return 0;
}
int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
if(code == 0 && taskId > 0){
// let's continue scan data in the wal files
if(code == 0 && pReq->reqType > 0){
tqScanWalAsync(pTq, false);
}

View File

@ -728,6 +728,7 @@ static void doStreamExecTaskHelper(void* param, void* tmrId) {
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
// release the task ref count
pTask->status.schedIdleTime = 0; // clear the idle time
streamMetaReleaseTask(pTask->pMeta, pTask);
}