fix(query): check return value.

This commit is contained in:
Haojun Liao 2024-07-29 01:42:30 +08:00
parent 18f5b323c9
commit 878d77fc7f
7 changed files with 63 additions and 54 deletions

View File

@ -682,13 +682,13 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo);
// fill-history task // fill-history task
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration); void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
// checkpoint related // checkpoint related
void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId); void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId);
int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId); int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId);
int32_t streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId); void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId);
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId); bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId);
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal); void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal);
void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask); void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);

View File

@ -36,7 +36,7 @@ int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProce
streamTaskOpenAllUpstreamInput(pTask); streamTaskOpenAllUpstreamInput(pTask);
streamTaskResetUpstreamStageInfo(pTask); streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask); (void) streamSetupScheduleTrigger(pTask);
SCheckpointInfo *pChkInfo = &pTask->chkInfo; SCheckpointInfo *pChkInfo = &pTask->chkInfo;
tqSetRestoreVersionInfo(pTask); tqSetRestoreVersionInfo(pTask);

View File

@ -169,7 +169,7 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
} }
dataRsp.common.blockNum = 0; dataRsp.common.blockNum = 0;
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset); (void) tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset);
tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf,
req.reqId); req.reqId);
@ -187,8 +187,8 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq*
char buf1[TSDB_OFFSET_LEN] = {0}; char buf1[TSDB_OFFSET_LEN] = {0};
char buf2[TSDB_OFFSET_LEN] = {0}; char buf2[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset); (void) tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset);
tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset); (void) tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset);
tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
vgId, pReq->consumerId, pReq->epoch, ((SMqDataRspCommon*)pRsp)->blockNum, buf1, buf2, pReq->reqId); vgId, pReq->consumerId, pReq->epoch, ((SMqDataRspCommon*)pRsp)->blockNum, buf1, buf2, pReq->reqId);
@ -412,7 +412,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset); (void) tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
@ -745,7 +745,7 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
} }
streamTaskResetUpstreamStageInfo(pTask); streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask); (void) streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo; SCheckpointInfo* pChkInfo = &pTask->chkInfo;
tqSetRestoreVersionInfo(pTask); tqSetRestoreVersionInfo(pTask);
@ -801,8 +801,11 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
if (done) { if (done) {
qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pStep2Range->minVer, qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pStep2Range->minVer,
pStep2Range->maxVer, 0.0); pStep2Range->maxVer, 0.0);
streamTaskPutTranstateIntoInputQ(pTask); int32_t code = streamTaskPutTranstateIntoInputQ(pTask); // todo: msg lost.
streamExecTask(pTask); // exec directly if (code) {
qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code));
}
(void) streamExecTask(pTask); // exec directly
} else { } else {
STimeWindow* pWindow = &pTask->dataRange.window; STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64
@ -811,7 +814,10 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
pStreamTask->id.idStr); pStreamTask->id.idStr);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow); int32_t code = streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow);
if (code) {
tqError("s-task:%s level:%d failed to set step2 param", id, pTask->info.taskLevel);
}
int64_t dstVer = pStep2Range->minVer; int64_t dstVer = pStep2Range->minVer;
pTask->chkInfo.nextProcessVer = dstVer; pTask->chkInfo.nextProcessVer = dstVer;
@ -820,12 +826,12 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE); pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE);
/*int8_t status = */ streamTaskSetSchedStatusInactive(pTask); (void) streamTaskSetSchedStatusInactive(pTask);
// now the fill-history task starts to scan data from wal files. // now the fill-history task starts to scan data from wal files.
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
tqScanWalAsync(pTq, false); (void) tqScanWalAsync(pTq, false);
} }
} }
} }
@ -949,11 +955,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pTask->streamTaskId.taskId, pTask->id.idStr); pTask->streamTaskId.taskId, pTask->id.idStr);
tqDebug("s-task:%s fill-history task set status to be dropping", id); tqDebug("s-task:%s fill-history task set status to be dropping", id);
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0); code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return -1; return code; // todo: handle failure
} }
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
@ -971,15 +977,14 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
// extracted submit data from wal files for all tasks // extracted submit data from wal files for all tasks
if (pReq->reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) { if (pReq->reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
tqScanWal(pTq); return tqScanWal(pTq);
return 0;
} }
int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
// let's continue scan data in the wal files // let's continue scan data in the wal files
if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) { if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) {
tqScanWalAsync(pTq, false); (void) tqScanWalAsync(pTq, false); // it's ok to failed
} }
return code; return code;
@ -1044,7 +1049,11 @@ int32_t tqStreamProgressRetrieveReq(STQ *pTq, SRpcMsg *pMsg) {
pRsp->subFetchIdx = req.subFetchIdx; pRsp->subFetchIdx = req.subFetchIdx;
pRsp->vgId = req.vgId; pRsp->vgId = req.vgId;
pRsp->streamId = req.streamId; pRsp->streamId = req.streamId;
tSerializeStreamProgressRsp(pRsp, sizeof(SStreamProgressRsp) + sizeof(SMsgHead), pRsp); code = tSerializeStreamProgressRsp(pRsp, sizeof(SStreamProgressRsp) + sizeof(SMsgHead), pRsp);
if (code) {
goto _OVER;
}
SRpcMsg rsp = {.info = pMsg->info, .code = 0}; SRpcMsg rsp = {.info = pMsg->info, .code = 0};
rsp.pCont = pRspBuf; rsp.pCont = pRspBuf;
pRspBuf = NULL; pRspBuf = NULL;
@ -1079,18 +1088,18 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return code; return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (!vnodeIsRoleLeader(pTq->pVnode)) { if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode
} }
if (!pTq->pVnode->restored) { if (!pTq->pVnode->restored) {
@ -1098,9 +1107,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
", transId:%d s-task:0x%x ignore it", ", transId:%d s-task:0x%x ignore it",
vgId, req.checkpointId, req.transId, req.taskId); vgId, req.checkpointId, req.transId, req.taskId);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode
} }
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
@ -1110,7 +1119,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
" transId:%d it may have been destroyed", " transId:%d it may have been destroyed",
vgId, req.taskId, req.checkpointId, req.transId); vgId, req.taskId, req.checkpointId, req.transId);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1123,13 +1132,13 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS; // todo retry handle error
} }
// todo save the checkpoint failed info // todo save the checkpoint failed info
taosThreadMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask).state; ETaskStatus status = streamTaskGetStatus(pTask).state;
if (req.mndTrigger == 1) { if (req.mndTrigger == 1) {
@ -1137,13 +1146,12 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpointId:%" PRId64 ", set it failure", tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpointId:%" PRId64 ", set it failure",
pTask->id.idStr, req.checkpointId); pTask->id.idStr, req.checkpointId);
taosThreadMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else { } else {
@ -1162,7 +1170,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
" transId:%d already handled, ignore msg and continue process checkpoint", " transId:%d already handled, ignore msg and continue process checkpoint",
pTask->id.idStr, checkpointId, req.transId); pTask->id.idStr, checkpointId, req.transId);
taosThreadMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1171,19 +1179,24 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
" transId:%d already handled, return success", pTask->id.idStr, req.checkpointId, req.transId); " transId:%d already handled, return success", pTask->id.idStr, req.checkpointId, req.transId);
taosThreadMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
streamProcessCheckpointSourceReq(pTask, &req); code = streamProcessCheckpointSourceReq(pTask, &req);
taosThreadMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
if (code) {
qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId, tstrerror(code));
return code;
}
if (req.mndTrigger) { if (req.mndTrigger) {
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ", pTask->id.idStr, qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ", pTask->id.idStr,
@ -1198,13 +1211,13 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return code; return TSDB_CODE_SUCCESS;
} }
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return code; return TSDB_CODE_SUCCESS;
} }
// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task // downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task

View File

@ -80,7 +80,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
return tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg); return tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg);
} }
int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE; int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE;
if (numOfTicks <= 0) { if (numOfTicks <= 0) {
numOfTicks = 1; numOfTicks = 1;
@ -91,10 +91,10 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration)
// add ref for task // add ref for task
SStreamTask* p = NULL; SStreamTask* p = NULL;
int32_t code = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, &p); int32_t code = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, &p);
if (p == NULL) { if (p == NULL || code != 0) {
stError("s-task:0x%x failed to acquire task, status:%s, not exec scan-history data", pTask->id.taskId, stError("s-task:0x%x failed to acquire task, status:%s, not exec scan-history data", pTask->id.taskId,
streamTaskGetStatus(pTask).name); streamTaskGetStatus(pTask).name);
return TSDB_CODE_SUCCESS; return;
} }
pTask->schedHistoryInfo.numOfTicks = numOfTicks; pTask->schedHistoryInfo.numOfTicks = numOfTicks;
@ -109,8 +109,6 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration)
streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); &pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr");
} }
return TSDB_CODE_SUCCESS;
} }
int32_t streamTaskStartScanHistory(SStreamTask* pTask) { int32_t streamTaskStartScanHistory(SStreamTask* pTask) {

View File

@ -766,8 +766,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) { int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) {
SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq)); SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno;
return -1;
} }
pReq->head.vgId = vgId; pReq->head.vgId = vgId;
@ -1089,11 +1088,10 @@ int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeChec
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) { void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) {
pTask->chkInfo.pActiveInfo->transId = transId; pTask->chkInfo.pActiveInfo->transId = transId;
pTask->chkInfo.pActiveInfo->activeId = checkpointId; pTask->chkInfo.pActiveInfo->activeId = checkpointId;
pTask->chkInfo.pActiveInfo->failedId = checkpointId; pTask->chkInfo.pActiveInfo->failedId = checkpointId;
return TSDB_CODE_SUCCESS;
} }
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) { int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) {

View File

@ -270,7 +270,7 @@ int32_t streamTaskRestoreStatus(SStreamTask* pTask) {
code = TSDB_CODE_FAILED; // failed to restore the status, since it is not in pause status code = TSDB_CODE_FAILED; // failed to restore the status, since it is not in pause status
} }
(void)taosThreadMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
return code; return code;
} }