Merge pull request #26819 from taosdata/fix/syntax
fix(query): check return value.
This commit is contained in:
commit
2f008317b2
|
@ -201,7 +201,7 @@ void qStreamSetOpen(qTaskInfo_t tinfo);
|
|||
|
||||
void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded);
|
||||
|
||||
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
|
||||
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
|
||||
|
||||
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
|
||||
|
||||
|
|
|
@ -682,13 +682,13 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo);
|
|||
// fill-history task
|
||||
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
|
||||
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
|
||||
int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration);
|
||||
void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration);
|
||||
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
|
||||
|
||||
// checkpoint related
|
||||
void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId);
|
||||
int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId);
|
||||
int32_t streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId);
|
||||
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId);
|
||||
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId);
|
||||
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal);
|
||||
void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
|
||||
|
|
|
@ -169,7 +169,7 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
|
|||
}
|
||||
dataRsp.common.blockNum = 0;
|
||||
char buf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset);
|
||||
(void) tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset);
|
||||
tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf,
|
||||
req.reqId);
|
||||
|
||||
|
@ -187,8 +187,8 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq*
|
|||
|
||||
char buf1[TSDB_OFFSET_LEN] = {0};
|
||||
char buf2[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset);
|
||||
tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset);
|
||||
(void) tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset);
|
||||
(void) tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset);
|
||||
|
||||
tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
|
||||
vgId, pReq->consumerId, pReq->epoch, ((SMqDataRspCommon*)pRsp)->blockNum, buf1, buf2, pReq->reqId);
|
||||
|
@ -412,7 +412,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
char buf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
|
||||
(void) tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
|
||||
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
|
||||
|
||||
|
@ -745,7 +745,7 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
|
|||
}
|
||||
|
||||
streamTaskResetUpstreamStageInfo(pTask);
|
||||
streamSetupScheduleTrigger(pTask);
|
||||
(void) streamSetupScheduleTrigger(pTask);
|
||||
|
||||
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
||||
tqSetRestoreVersionInfo(pTask);
|
||||
|
@ -801,8 +801,11 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
|
|||
if (done) {
|
||||
qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pStep2Range->minVer,
|
||||
pStep2Range->maxVer, 0.0);
|
||||
streamTaskPutTranstateIntoInputQ(pTask);
|
||||
streamExecTask(pTask); // exec directly
|
||||
int32_t code = streamTaskPutTranstateIntoInputQ(pTask); // todo: msg lost.
|
||||
if (code) {
|
||||
qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code));
|
||||
}
|
||||
(void) streamExecTask(pTask); // exec directly
|
||||
} else {
|
||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64
|
||||
|
@ -811,7 +814,10 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
|
|||
pStreamTask->id.idStr);
|
||||
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
||||
|
||||
streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow);
|
||||
int32_t code = streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow);
|
||||
if (code) {
|
||||
tqError("s-task:%s level:%d failed to set step2 param", id, pTask->info.taskLevel);
|
||||
}
|
||||
|
||||
int64_t dstVer = pStep2Range->minVer;
|
||||
pTask->chkInfo.nextProcessVer = dstVer;
|
||||
|
@ -820,12 +826,12 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
|
|||
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
|
||||
pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE);
|
||||
|
||||
/*int8_t status = */ streamTaskSetSchedStatusInactive(pTask);
|
||||
(void) streamTaskSetSchedStatusInactive(pTask);
|
||||
|
||||
// now the fill-history task starts to scan data from wal files.
|
||||
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
tqScanWalAsync(pTq, false);
|
||||
(void) tqScanWalAsync(pTq, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -949,11 +955,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
pTask->streamTaskId.taskId, pTask->id.idStr);
|
||||
|
||||
tqDebug("s-task:%s fill-history task set status to be dropping", id);
|
||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
|
||||
code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
|
||||
|
||||
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return -1;
|
||||
return code; // todo: handle failure
|
||||
}
|
||||
|
||||
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||
|
@ -971,15 +977,14 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
// extracted submit data from wal files for all tasks
|
||||
if (pReq->reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
|
||||
tqScanWal(pTq);
|
||||
return 0;
|
||||
return tqScanWal(pTq);
|
||||
}
|
||||
|
||||
int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
|
||||
|
||||
// let's continue scan data in the wal files
|
||||
if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) {
|
||||
tqScanWalAsync(pTq, false);
|
||||
(void) tqScanWalAsync(pTq, false); // it's ok to failed
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -1044,7 +1049,11 @@ int32_t tqStreamProgressRetrieveReq(STQ *pTq, SRpcMsg *pMsg) {
|
|||
pRsp->subFetchIdx = req.subFetchIdx;
|
||||
pRsp->vgId = req.vgId;
|
||||
pRsp->streamId = req.streamId;
|
||||
tSerializeStreamProgressRsp(pRsp, sizeof(SStreamProgressRsp) + sizeof(SMsgHead), pRsp);
|
||||
code = tSerializeStreamProgressRsp(pRsp, sizeof(SStreamProgressRsp) + sizeof(SMsgHead), pRsp);
|
||||
if (code) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||
rsp.pCont = pRspBuf;
|
||||
pRspBuf = NULL;
|
||||
|
@ -1079,18 +1088,18 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
|
||||
|
||||
SRpcMsg rsp = {0};
|
||||
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return code;
|
||||
return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
if (!vnodeIsRoleLeader(pTq->pVnode)) {
|
||||
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
|
||||
SRpcMsg rsp = {0};
|
||||
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode
|
||||
}
|
||||
|
||||
if (!pTq->pVnode->restored) {
|
||||
|
@ -1098,9 +1107,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
", transId:%d s-task:0x%x ignore it",
|
||||
vgId, req.checkpointId, req.transId, req.taskId);
|
||||
SRpcMsg rsp = {0};
|
||||
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode
|
||||
}
|
||||
|
||||
SStreamTask* pTask = NULL;
|
||||
|
@ -1110,7 +1119,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
" transId:%d it may have been destroyed",
|
||||
vgId, req.taskId, req.checkpointId, req.transId);
|
||||
SRpcMsg rsp = {0};
|
||||
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1123,13 +1132,13 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
SRpcMsg rsp = {0};
|
||||
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return TSDB_CODE_SUCCESS; // todo retry handle error
|
||||
}
|
||||
|
||||
// todo save the checkpoint failed info
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
streamMutexLock(&pTask->lock);
|
||||
ETaskStatus status = streamTaskGetStatus(pTask).state;
|
||||
|
||||
if (req.mndTrigger == 1) {
|
||||
|
@ -1137,13 +1146,12 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpointId:%" PRId64 ", set it failure",
|
||||
pTask->id.idStr, req.checkpointId);
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
SRpcMsg rsp = {0};
|
||||
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
|
@ -1162,7 +1170,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
" transId:%d already handled, ignore msg and continue process checkpoint",
|
||||
pTask->id.idStr, checkpointId, req.transId);
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1171,19 +1179,24 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
|
||||
" transId:%d already handled, return success", pTask->id.idStr, req.checkpointId, req.transId);
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
SRpcMsg rsp = {0};
|
||||
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
streamProcessCheckpointSourceReq(pTask, &req);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
code = streamProcessCheckpointSourceReq(pTask, &req);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
if (code) {
|
||||
qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
if (req.mndTrigger) {
|
||||
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ", pTask->id.idStr,
|
||||
|
@ -1198,13 +1211,13 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
SRpcMsg rsp = {0};
|
||||
streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return code;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return code;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task
|
||||
|
|
|
@ -115,14 +115,15 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
|
|||
if (pDataBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
STqOffsetVal offset = {0};
|
||||
qStreamExtractOffset(task, &offset);
|
||||
code = qStreamExtractOffset(task, &offset);
|
||||
TSDB_CHECK_CODE(code, line, END);
|
||||
|
||||
pHandle->block = NULL;
|
||||
|
||||
code = createOneDataBlock(pDataBlock, true, &pHandle->block);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, line, END);
|
||||
|
||||
pHandle->blockTime = offset.ts;
|
||||
tOffsetDestroy(&offset);
|
||||
|
@ -140,8 +141,11 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
|
|||
} else {
|
||||
code = copyDataBlock(pHandle->block, pDataBlock);
|
||||
TSDB_CHECK_CODE(code, line, END);
|
||||
|
||||
STqOffsetVal offset = {0};
|
||||
qStreamExtractOffset(task, &offset);
|
||||
code = qStreamExtractOffset(task, &offset);
|
||||
TSDB_CHECK_CODE(code, line, END);
|
||||
|
||||
pRsp->sleepTime = offset.ts - pHandle->blockTime;
|
||||
pHandle->blockTime = offset.ts;
|
||||
tOffsetDestroy(&offset);
|
||||
|
@ -164,10 +168,11 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
|
|||
|
||||
tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d",
|
||||
pHandle->consumerId, vgId, pRsp->common.blockNum, totalRows);
|
||||
qStreamExtractOffset(task, &pRsp->common.rspOffset);
|
||||
code = qStreamExtractOffset(task, &pRsp->common.rspOffset);
|
||||
END:
|
||||
if ( code!= 0){
|
||||
tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line, code);
|
||||
if (code != 0) {
|
||||
tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line,
|
||||
code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -241,31 +246,40 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatc
|
|||
// get meta
|
||||
SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
|
||||
if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
|
||||
qStreamExtractOffset(task, &tmp->rspOffset);
|
||||
code = qStreamExtractOffset(task, &tmp->rspOffset);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
*pBatchMetaRsp = *tmp;
|
||||
tqDebug("tmqsnap task get meta");
|
||||
break;
|
||||
}
|
||||
|
||||
if (pDataBlock == NULL) {
|
||||
qStreamExtractOffset(task, pOffset);
|
||||
code = qStreamExtractOffset(task, pOffset);
|
||||
if (code) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
continue;
|
||||
}
|
||||
|
||||
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
|
||||
pHandle->snapshotVer + 1);
|
||||
qStreamExtractOffset(task, &pRsp->common.rspOffset);
|
||||
code = qStreamExtractOffset(task, &pRsp->common.rspOffset);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pRsp->common.blockNum > 0) {
|
||||
tqDebug("tmqsnap task exec exited, get data");
|
||||
qStreamExtractOffset(task, &pRsp->common.rspOffset);
|
||||
code = qStreamExtractOffset(task, &pRsp->common.rspOffset);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@ typedef struct {
|
|||
static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInfo** pInfo);
|
||||
static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
|
||||
STsdbReader* pReader);
|
||||
static int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes);
|
||||
static void getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes);
|
||||
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey,
|
||||
STsdbReader* pReader);
|
||||
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo,
|
||||
|
@ -3866,11 +3866,11 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
|
|||
return false;
|
||||
}
|
||||
|
||||
FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes) {
|
||||
FORCE_INLINE void getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes) {
|
||||
*pRes = NULL;
|
||||
|
||||
if (!pIter->hasVal) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t order = pReader->info.order;
|
||||
|
@ -3880,20 +3880,20 @@ FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, ST
|
|||
TSDBROW_INIT_KEY(pRow, key);
|
||||
if (outOfTimeWindow(key.ts, &pReader->info.window)) {
|
||||
pIter->hasVal = false;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return;
|
||||
}
|
||||
|
||||
// it is a valid data version
|
||||
if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) {
|
||||
if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) {
|
||||
*pRes = pRow;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return;
|
||||
} else {
|
||||
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange,
|
||||
pReader->suppInfo.numOfPks > 0);
|
||||
if (!dropped) {
|
||||
*pRes = pRow;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3901,7 +3901,7 @@ FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, ST
|
|||
while (1) {
|
||||
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
|
||||
if (!pIter->hasVal) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return;
|
||||
}
|
||||
|
||||
pRow = tsdbTbDataIterGet(pIter->iter);
|
||||
|
@ -3909,19 +3909,19 @@ FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, ST
|
|||
TSDBROW_INIT_KEY(pRow, key);
|
||||
if (outOfTimeWindow(key.ts, &pReader->info.window)) {
|
||||
pIter->hasVal = false;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return;
|
||||
}
|
||||
|
||||
if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) {
|
||||
if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) {
|
||||
*pRes = pRow;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return;
|
||||
} else {
|
||||
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange,
|
||||
pReader->suppInfo.numOfPks > 0);
|
||||
if (!dropped) {
|
||||
*pRes = pRow;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1158,14 +1158,15 @@ SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
|
|||
return &pTaskInfo->streamInfo.btMetaRsp;
|
||||
}
|
||||
|
||||
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
|
||||
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
int32_t code = tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset);
|
||||
return 0;
|
||||
/*if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
}*/
|
||||
}
|
||||
|
||||
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
|
||||
|
@ -1436,8 +1437,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
}
|
||||
|
||||
end:
|
||||
tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset);
|
||||
|
||||
(void) tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -309,14 +309,8 @@ int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo
|
|||
}
|
||||
|
||||
code = extractQualifiedTupleByFilterResult(pBlock, p, status);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
|
||||
_err:
|
||||
|
||||
colDataDestroy(p);
|
||||
taosMemoryFree(p);
|
||||
|
||||
|
@ -379,14 +373,8 @@ int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SM
|
|||
}
|
||||
|
||||
code = extractQualifiedTupleByFilterResult(pBlock, p, status);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _return;
|
||||
}
|
||||
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
colDataDestroy(p);
|
||||
taosMemoryFree(p);
|
||||
|
||||
|
|
|
@ -896,14 +896,14 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
|
|||
}
|
||||
|
||||
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
|
||||
int32_t code = 0, lino = 0;
|
||||
SUdfColumnMeta* meta = &udfCol->colMeta;
|
||||
int32_t code = 0, lino = 0;
|
||||
SUdfColumnMeta *meta = &udfCol->colMeta;
|
||||
|
||||
SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1);
|
||||
code = blockDataAppendColInfo(block, &colInfoData);
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
|
||||
code = blockDataEnsureCapacity(block, udfCol->colData.numOfRows);
|
||||
code = blockDataEnsureCapacity(block, udfCol->colData.numOfRows);
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
|
||||
SColumnInfoData *col = NULL;
|
||||
|
|
|
@ -80,7 +80,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
|
|||
return tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg);
|
||||
}
|
||||
|
||||
int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
|
||||
void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
|
||||
int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE;
|
||||
if (numOfTicks <= 0) {
|
||||
numOfTicks = 1;
|
||||
|
@ -91,10 +91,10 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration)
|
|||
// add ref for task
|
||||
SStreamTask* p = NULL;
|
||||
int32_t code = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, &p);
|
||||
if (p == NULL) {
|
||||
if (p == NULL || code != 0) {
|
||||
stError("s-task:0x%x failed to acquire task, status:%s, not exec scan-history data", pTask->id.taskId,
|
||||
streamTaskGetStatus(pTask).name);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return;
|
||||
}
|
||||
|
||||
pTask->schedHistoryInfo.numOfTicks = numOfTicks;
|
||||
|
@ -109,8 +109,6 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration)
|
|||
streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
|
||||
&pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr");
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
|
||||
|
|
|
@ -766,8 +766,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
|
|||
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) {
|
||||
SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pReq->head.vgId = vgId;
|
||||
|
@ -1089,11 +1088,10 @@ int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeChec
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) {
|
||||
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) {
|
||||
pTask->chkInfo.pActiveInfo->transId = transId;
|
||||
pTask->chkInfo.pActiveInfo->activeId = checkpointId;
|
||||
pTask->chkInfo.pActiveInfo->failedId = checkpointId;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) {
|
||||
|
|
|
@ -270,7 +270,7 @@ int32_t streamTaskRestoreStatus(SStreamTask* pTask) {
|
|||
code = TSDB_CODE_FAILED; // failed to restore the status, since it is not in pause status
|
||||
}
|
||||
|
||||
(void)taosThreadMutexUnlock(&pTask->lock);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue