diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 324804aa7c..09143dde29 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -201,7 +201,7 @@ void qStreamSetOpen(qTaskInfo_t tinfo); void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded); -void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); +int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5f322be99b..0d1e62cdbe 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -682,13 +682,13 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); // fill-history task int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); -int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration); +void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); // checkpoint related void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId); int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId); -int32_t streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId); +void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId); bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId); void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal); void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0ab420ce63..8a4571a4af 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -169,7 +169,7 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { } dataRsp.common.blockNum = 0; char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset); + (void) tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset); tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId); @@ -187,8 +187,8 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* char buf1[TSDB_OFFSET_LEN] = {0}; char buf2[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset); - tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset); + (void) tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset); + (void) tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset); tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId, pReq->consumerId, pReq->epoch, ((SMqDataRspCommon*)pRsp)->blockNum, buf1, buf2, pReq->reqId); @@ -412,7 +412,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset); + (void) tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); @@ -745,7 +745,7 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV } streamTaskResetUpstreamStageInfo(pTask); - streamSetupScheduleTrigger(pTask); + (void) streamSetupScheduleTrigger(pTask); SCheckpointInfo* pChkInfo = &pTask->chkInfo; tqSetRestoreVersionInfo(pTask); @@ -801,8 +801,11 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask if (done) { qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pStep2Range->minVer, pStep2Range->maxVer, 0.0); - streamTaskPutTranstateIntoInputQ(pTask); - streamExecTask(pTask); // exec directly + int32_t code = streamTaskPutTranstateIntoInputQ(pTask); // todo: msg lost. + if (code) { + qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code)); + } + (void) streamExecTask(pTask); // exec directly } else { STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 @@ -811,7 +814,10 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask pStreamTask->id.idStr); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow); + int32_t code = streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow); + if (code) { + tqError("s-task:%s level:%d failed to set step2 param", id, pTask->info.taskLevel); + } int64_t dstVer = pStep2Range->minVer; pTask->chkInfo.nextProcessVer = dstVer; @@ -820,12 +826,12 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE); - /*int8_t status = */ streamTaskSetSchedStatusInactive(pTask); + (void) streamTaskSetSchedStatusInactive(pTask); // now the fill-history task starts to scan data from wal files. - int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); if (code == TSDB_CODE_SUCCESS) { - tqScanWalAsync(pTq, false); + (void) tqScanWalAsync(pTq, false); } } } @@ -949,11 +955,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pTask->streamTaskId.taskId, pTask->id.idStr); tqDebug("s-task:%s fill-history task set status to be dropping", id); - streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0); + code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0); streamMetaReleaseTask(pMeta, pTask); - return -1; + return code; // todo: handle failure } ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); @@ -971,15 +977,14 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // extracted submit data from wal files for all tasks if (pReq->reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) { - tqScanWal(pTq); - return 0; + return tqScanWal(pTq); } int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); // let's continue scan data in the wal files if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) { - tqScanWalAsync(pTq, false); + (void) tqScanWalAsync(pTq, false); // it's ok to failed } return code; @@ -1044,7 +1049,11 @@ int32_t tqStreamProgressRetrieveReq(STQ *pTq, SRpcMsg *pMsg) { pRsp->subFetchIdx = req.subFetchIdx; pRsp->vgId = req.vgId; pRsp->streamId = req.streamId; - tSerializeStreamProgressRsp(pRsp, sizeof(SStreamProgressRsp) + sizeof(SMsgHead), pRsp); + code = tSerializeStreamProgressRsp(pRsp, sizeof(SStreamProgressRsp) + sizeof(SMsgHead), pRsp); + if (code) { + goto _OVER; + } + SRpcMsg rsp = {.info = pMsg->info, .code = 0}; rsp.pCont = pRspBuf; pRspBuf = NULL; @@ -1079,18 +1088,18 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); SRpcMsg rsp = {0}; - streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs - return code; + return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode } tDecoderClear(&decoder); if (!vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); SRpcMsg rsp = {0}; - streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode } if (!pTq->pVnode->restored) { @@ -1098,9 +1107,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) ", transId:%d s-task:0x%x ignore it", vgId, req.checkpointId, req.transId, req.taskId); SRpcMsg rsp = {0}; - streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode } SStreamTask* pTask = NULL; @@ -1110,7 +1119,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) " transId:%d it may have been destroyed", vgId, req.taskId, req.checkpointId, req.transId); SRpcMsg rsp = {0}; - streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1123,13 +1132,13 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; // todo retry handle error } // todo save the checkpoint failed info - taosThreadMutexLock(&pTask->lock); + streamMutexLock(&pTask->lock); ETaskStatus status = streamTaskGetStatus(pTask).state; if (req.mndTrigger == 1) { @@ -1137,13 +1146,12 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpointId:%" PRId64 ", set it failure", pTask->id.idStr, req.checkpointId); - taosThreadMutexUnlock(&pTask->lock); + streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; } } else { @@ -1162,7 +1170,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) " transId:%d already handled, ignore msg and continue process checkpoint", pTask->id.idStr, checkpointId, req.transId); - taosThreadMutexUnlock(&pTask->lock); + streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; @@ -1171,19 +1179,24 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 " transId:%d already handled, return success", pTask->id.idStr, req.checkpointId, req.transId); - taosThreadMutexUnlock(&pTask->lock); + streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } } - streamProcessCheckpointSourceReq(pTask, &req); - taosThreadMutexUnlock(&pTask->lock); + code = streamProcessCheckpointSourceReq(pTask, &req); + streamMutexUnlock(&pTask->lock); + + if (code) { + qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId, tstrerror(code)); + return code; + } if (req.mndTrigger) { qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ", pTask->id.idStr, @@ -1198,13 +1211,13 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); if (code != TSDB_CODE_SUCCESS) { SRpcMsg rsp = {0}; - streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs - return code; + return TSDB_CODE_SUCCESS; } streamMetaReleaseTask(pMeta, pTask); - return code; + return TSDB_CODE_SUCCESS; } // downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 5df8c97962..d072d7199c 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -115,14 +115,15 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* if (pDataBlock == NULL) { break; } + STqOffsetVal offset = {0}; - qStreamExtractOffset(task, &offset); + code = qStreamExtractOffset(task, &offset); + TSDB_CHECK_CODE(code, line, END); + pHandle->block = NULL; code = createOneDataBlock(pDataBlock, true, &pHandle->block); - if (code) { - return code; - } + TSDB_CHECK_CODE(code, line, END); pHandle->blockTime = offset.ts; tOffsetDestroy(&offset); @@ -140,8 +141,11 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* } else { code = copyDataBlock(pHandle->block, pDataBlock); TSDB_CHECK_CODE(code, line, END); + STqOffsetVal offset = {0}; - qStreamExtractOffset(task, &offset); + code = qStreamExtractOffset(task, &offset); + TSDB_CHECK_CODE(code, line, END); + pRsp->sleepTime = offset.ts - pHandle->blockTime; pHandle->blockTime = offset.ts; tOffsetDestroy(&offset); @@ -164,10 +168,11 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d", pHandle->consumerId, vgId, pRsp->common.blockNum, totalRows); - qStreamExtractOffset(task, &pRsp->common.rspOffset); + code = qStreamExtractOffset(task, &pRsp->common.rspOffset); END: - if ( code!= 0){ - tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line, code); + if (code != 0) { + tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line, + code); } return code; } @@ -241,31 +246,40 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatc // get meta SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task); if (taosArrayGetSize(tmp->batchMetaReq) > 0) { - qStreamExtractOffset(task, &tmp->rspOffset); + code = qStreamExtractOffset(task, &tmp->rspOffset); + if (code) { + return code; + } + *pBatchMetaRsp = *tmp; tqDebug("tmqsnap task get meta"); break; } if (pDataBlock == NULL) { - qStreamExtractOffset(task, pOffset); + code = qStreamExtractOffset(task, pOffset); + if (code) { + break; + } + if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { continue; } + tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1); - qStreamExtractOffset(task, &pRsp->common.rspOffset); + code = qStreamExtractOffset(task, &pRsp->common.rspOffset); break; } if (pRsp->common.blockNum > 0) { tqDebug("tmqsnap task exec exited, get data"); - qStreamExtractOffset(task, &pRsp->common.rspOffset); + code = qStreamExtractOffset(task, &pRsp->common.rspOffset); break; } } - return 0; + return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index a3210dbfd9..4fedb68a78 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -48,7 +48,7 @@ typedef struct { static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInfo** pInfo); static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader); -static int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes); +static void getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey, STsdbReader* pReader); static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, @@ -3866,11 +3866,11 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t return false; } -FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes) { +FORCE_INLINE void getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes) { *pRes = NULL; if (!pIter->hasVal) { - return TSDB_CODE_SUCCESS; + return; } int32_t order = pReader->info.order; @@ -3880,20 +3880,20 @@ FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, ST TSDBROW_INIT_KEY(pRow, key); if (outOfTimeWindow(key.ts, &pReader->info.window)) { pIter->hasVal = false; - return TSDB_CODE_SUCCESS; + return; } // it is a valid data version if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) { if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) { *pRes = pRow; - return TSDB_CODE_SUCCESS; + return; } else { bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0); if (!dropped) { *pRes = pRow; - return TSDB_CODE_SUCCESS; + return; } } } @@ -3901,7 +3901,7 @@ FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, ST while (1) { pIter->hasVal = tsdbTbDataIterNext(pIter->iter); if (!pIter->hasVal) { - return TSDB_CODE_SUCCESS; + return; } pRow = tsdbTbDataIterGet(pIter->iter); @@ -3909,19 +3909,19 @@ FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, ST TSDBROW_INIT_KEY(pRow, key); if (outOfTimeWindow(key.ts, &pReader->info.window)) { pIter->hasVal = false; - return TSDB_CODE_SUCCESS; + return; } if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) { if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) { *pRes = pRow; - return TSDB_CODE_SUCCESS; + return; } else { bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0); if (!dropped) { *pRes = pRow; - return TSDB_CODE_SUCCESS; + return; } } } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c4cc49e8ea..75c335a6a8 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1158,14 +1158,15 @@ SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { return &pTaskInfo->streamInfo.btMetaRsp; } -void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { +int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int32_t code = tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset); - if (code != TSDB_CODE_SUCCESS) { + tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset); + return 0; + /*if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); - } + }*/ } int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) { @@ -1436,8 +1437,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } end: - tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset); - + (void) tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset); return 0; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 8db4c6335f..2c485cdd1b 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -309,14 +309,8 @@ int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo } code = extractQualifiedTupleByFilterResult(pBlock, p, status); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } - - code = TSDB_CODE_SUCCESS; _err: - colDataDestroy(p); taosMemoryFree(p); @@ -379,14 +373,8 @@ int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SM } code = extractQualifiedTupleByFilterResult(pBlock, p, status); - if (code != TSDB_CODE_SUCCESS) { - goto _return; - } - - code = TSDB_CODE_SUCCESS; _return: - colDataDestroy(p); taosMemoryFree(p); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 7c0ddb6175..66e503fd89 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -896,14 +896,14 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo } int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { - int32_t code = 0, lino = 0; - SUdfColumnMeta* meta = &udfCol->colMeta; + int32_t code = 0, lino = 0; + SUdfColumnMeta *meta = &udfCol->colMeta; SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1); code = blockDataAppendColInfo(block, &colInfoData); TAOS_CHECK_GOTO(code, &lino, _exit); - code = blockDataEnsureCapacity(block, udfCol->colData.numOfRows); + code = blockDataEnsureCapacity(block, udfCol->colData.numOfRows); TAOS_CHECK_GOTO(code, &lino, _exit); SColumnInfoData *col = NULL; diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index db0784d572..0dbb6ed454 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -80,7 +80,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { return tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg); } -int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { +void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE; if (numOfTicks <= 0) { numOfTicks = 1; @@ -91,10 +91,10 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) // add ref for task SStreamTask* p = NULL; int32_t code = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, &p); - if (p == NULL) { + if (p == NULL || code != 0) { stError("s-task:0x%x failed to acquire task, status:%s, not exec scan-history data", pTask->id.taskId, streamTaskGetStatus(pTask).name); - return TSDB_CODE_SUCCESS; + return; } pTask->schedHistoryInfo.numOfTicks = numOfTicks; @@ -109,8 +109,6 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); } - - return TSDB_CODE_SUCCESS; } int32_t streamTaskStartScanHistory(SStreamTask* pTask) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 141bd8fc3e..99528d01b0 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -766,8 +766,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) { int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) { SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno; } pReq->head.vgId = vgId; @@ -1089,11 +1088,10 @@ int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeChec return TSDB_CODE_SUCCESS; } -int32_t streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) { +void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) { pTask->chkInfo.pActiveInfo->transId = transId; pTask->chkInfo.pActiveInfo->activeId = checkpointId; pTask->chkInfo.pActiveInfo->failedId = checkpointId; - return TSDB_CODE_SUCCESS; } int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) { diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index c3e0df52d4..275c9255d2 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -270,7 +270,7 @@ int32_t streamTaskRestoreStatus(SStreamTask* pTask) { code = TSDB_CODE_FAILED; // failed to restore the status, since it is not in pause status } - (void)taosThreadMutexUnlock(&pTask->lock); + streamMutexUnlock(&pTask->lock); return code; }