diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5f322be99b..0d1e62cdbe 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -682,13 +682,13 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); // fill-history task int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); -int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration); +void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); // checkpoint related void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId); int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId); -int32_t streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId); +void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId); bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId); void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal); void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 5e536e5fbf..da4763e56e 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -36,7 +36,7 @@ int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProce streamTaskOpenAllUpstreamInput(pTask); streamTaskResetUpstreamStageInfo(pTask); - streamSetupScheduleTrigger(pTask); + (void) streamSetupScheduleTrigger(pTask); SCheckpointInfo *pChkInfo = &pTask->chkInfo; tqSetRestoreVersionInfo(pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0ab420ce63..8a4571a4af 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -169,7 +169,7 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { } dataRsp.common.blockNum = 0; char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset); + (void) tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset); tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId); @@ -187,8 +187,8 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* char buf1[TSDB_OFFSET_LEN] = {0}; char buf2[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset); - tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset); + (void) tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset); + (void) tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset); tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId, pReq->consumerId, pReq->epoch, ((SMqDataRspCommon*)pRsp)->blockNum, buf1, buf2, pReq->reqId); @@ -412,7 +412,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset); + (void) tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); @@ -745,7 +745,7 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV } streamTaskResetUpstreamStageInfo(pTask); - streamSetupScheduleTrigger(pTask); + (void) streamSetupScheduleTrigger(pTask); SCheckpointInfo* pChkInfo = &pTask->chkInfo; tqSetRestoreVersionInfo(pTask); @@ -801,8 +801,11 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask if (done) { qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pStep2Range->minVer, pStep2Range->maxVer, 0.0); - streamTaskPutTranstateIntoInputQ(pTask); - streamExecTask(pTask); // exec directly + int32_t code = streamTaskPutTranstateIntoInputQ(pTask); // todo: msg lost. + if (code) { + qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code)); + } + (void) streamExecTask(pTask); // exec directly } else { STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 @@ -811,7 +814,10 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask pStreamTask->id.idStr); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow); + int32_t code = streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow); + if (code) { + tqError("s-task:%s level:%d failed to set step2 param", id, pTask->info.taskLevel); + } int64_t dstVer = pStep2Range->minVer; pTask->chkInfo.nextProcessVer = dstVer; @@ -820,12 +826,12 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE); - /*int8_t status = */ streamTaskSetSchedStatusInactive(pTask); + (void) streamTaskSetSchedStatusInactive(pTask); // now the fill-history task starts to scan data from wal files. - int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); if (code == TSDB_CODE_SUCCESS) { - tqScanWalAsync(pTq, false); + (void) tqScanWalAsync(pTq, false); } } } @@ -949,11 +955,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pTask->streamTaskId.taskId, pTask->id.idStr); tqDebug("s-task:%s fill-history task set status to be dropping", id); - streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0); + code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0); streamMetaReleaseTask(pMeta, pTask); - return -1; + return code; // todo: handle failure } ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); @@ -971,15 +977,14 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // extracted submit data from wal files for all tasks if (pReq->reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) { - tqScanWal(pTq); - return 0; + return tqScanWal(pTq); } int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); // let's continue scan data in the wal files if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) { - tqScanWalAsync(pTq, false); + (void) tqScanWalAsync(pTq, false); // it's ok to failed } return code; @@ -1044,7 +1049,11 @@ int32_t tqStreamProgressRetrieveReq(STQ *pTq, SRpcMsg *pMsg) { pRsp->subFetchIdx = req.subFetchIdx; pRsp->vgId = req.vgId; pRsp->streamId = req.streamId; - tSerializeStreamProgressRsp(pRsp, sizeof(SStreamProgressRsp) + sizeof(SMsgHead), pRsp); + code = tSerializeStreamProgressRsp(pRsp, sizeof(SStreamProgressRsp) + sizeof(SMsgHead), pRsp); + if (code) { + goto _OVER; + } + SRpcMsg rsp = {.info = pMsg->info, .code = 0}; rsp.pCont = pRspBuf; pRspBuf = NULL; @@ -1079,18 +1088,18 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); SRpcMsg rsp = {0}; - streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs - return code; + return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode } tDecoderClear(&decoder); if (!vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); SRpcMsg rsp = {0}; - streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode } if (!pTq->pVnode->restored) { @@ -1098,9 +1107,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) ", transId:%d s-task:0x%x ignore it", vgId, req.checkpointId, req.transId, req.taskId); SRpcMsg rsp = {0}; - streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode } SStreamTask* pTask = NULL; @@ -1110,7 +1119,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) " transId:%d it may have been destroyed", vgId, req.taskId, req.checkpointId, req.transId); SRpcMsg rsp = {0}; - streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1123,13 +1132,13 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; // todo retry handle error } // todo save the checkpoint failed info - taosThreadMutexLock(&pTask->lock); + streamMutexLock(&pTask->lock); ETaskStatus status = streamTaskGetStatus(pTask).state; if (req.mndTrigger == 1) { @@ -1137,13 +1146,12 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpointId:%" PRId64 ", set it failure", pTask->id.idStr, req.checkpointId); - taosThreadMutexUnlock(&pTask->lock); + streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; } } else { @@ -1162,7 +1170,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) " transId:%d already handled, ignore msg and continue process checkpoint", pTask->id.idStr, checkpointId, req.transId); - taosThreadMutexUnlock(&pTask->lock); + streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; @@ -1171,19 +1179,24 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 " transId:%d already handled, return success", pTask->id.idStr, req.checkpointId, req.transId); - taosThreadMutexUnlock(&pTask->lock); + streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } } - streamProcessCheckpointSourceReq(pTask, &req); - taosThreadMutexUnlock(&pTask->lock); + code = streamProcessCheckpointSourceReq(pTask, &req); + streamMutexUnlock(&pTask->lock); + + if (code) { + qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId, tstrerror(code)); + return code; + } if (req.mndTrigger) { qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ", pTask->id.idStr, @@ -1198,13 +1211,13 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); if (code != TSDB_CODE_SUCCESS) { SRpcMsg rsp = {0}; - streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs - return code; + return TSDB_CODE_SUCCESS; } streamMetaReleaseTask(pMeta, pTask); - return code; + return TSDB_CODE_SUCCESS; } // downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 7c0ddb6175..66e503fd89 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -896,14 +896,14 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo } int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { - int32_t code = 0, lino = 0; - SUdfColumnMeta* meta = &udfCol->colMeta; + int32_t code = 0, lino = 0; + SUdfColumnMeta *meta = &udfCol->colMeta; SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1); code = blockDataAppendColInfo(block, &colInfoData); TAOS_CHECK_GOTO(code, &lino, _exit); - code = blockDataEnsureCapacity(block, udfCol->colData.numOfRows); + code = blockDataEnsureCapacity(block, udfCol->colData.numOfRows); TAOS_CHECK_GOTO(code, &lino, _exit); SColumnInfoData *col = NULL; diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index db0784d572..0dbb6ed454 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -80,7 +80,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { return tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg); } -int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { +void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE; if (numOfTicks <= 0) { numOfTicks = 1; @@ -91,10 +91,10 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) // add ref for task SStreamTask* p = NULL; int32_t code = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, &p); - if (p == NULL) { + if (p == NULL || code != 0) { stError("s-task:0x%x failed to acquire task, status:%s, not exec scan-history data", pTask->id.taskId, streamTaskGetStatus(pTask).name); - return TSDB_CODE_SUCCESS; + return; } pTask->schedHistoryInfo.numOfTicks = numOfTicks; @@ -109,8 +109,6 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); } - - return TSDB_CODE_SUCCESS; } int32_t streamTaskStartScanHistory(SStreamTask* pTask) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 141bd8fc3e..99528d01b0 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -766,8 +766,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) { int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) { SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno; } pReq->head.vgId = vgId; @@ -1089,11 +1088,10 @@ int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeChec return TSDB_CODE_SUCCESS; } -int32_t streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) { +void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) { pTask->chkInfo.pActiveInfo->transId = transId; pTask->chkInfo.pActiveInfo->activeId = checkpointId; pTask->chkInfo.pActiveInfo->failedId = checkpointId; - return TSDB_CODE_SUCCESS; } int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) { diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index c3e0df52d4..275c9255d2 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -270,7 +270,7 @@ int32_t streamTaskRestoreStatus(SStreamTask* pTask) { code = TSDB_CODE_FAILED; // failed to restore the status, since it is not in pause status } - (void)taosThreadMutexUnlock(&pTask->lock); + streamMutexUnlock(&pTask->lock); return code; }