diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 2598b5c28c..91db78e9cd 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -90,7 +90,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 */ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); -void qSetTaskCode(qTaskInfo_t tinfo, int32_t code); +//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index a318b9886e..922a85c094 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -75,7 +75,7 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg); if (code != 0) { if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr(code)); + dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, tstrerror(code)); vmSendRsp(pMsg, code); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index da366c1610..ce52277d71 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -384,7 +384,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { bool exec = tqIsHandleExec(pHandle); if(!exec) { tqSetHandleExec(pHandle); - qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); +// qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle); taosWUnLockLatch(&pTq->lock); break; @@ -586,9 +586,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg qKillTask(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); } - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - qStreamCloseTsdbReader(pTaskInfo); - } +// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { +// qStreamCloseTsdbReader(pTaskInfo); +// } } // remove if it has been register in the push manager, and return one empty block to consumer tqUnregisterPushHandle(pTq, pHandle); diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index e4b2fa8821..2f470e2221 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -84,8 +84,10 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs qStreamSetOpen(task); tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId); - if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) { - tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr()); + code = qExecTask(task, &pDataBlock, &ts); + if (code != TSDB_CODE_SUCCESS) { + tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code)); + terrno = code; return -1; } @@ -128,8 +130,10 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; tqDebug("tmqsnap task start to execute"); - if (qExecTask(task, &pDataBlock, &ts) < 0) { - tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr()); + int code = qExecTask(task, &pDataBlock, &ts); + if (code != 0) { + tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, tstrerror(code)); + terrno = code; return -1; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 7d251fb074..a73deffa52 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -180,10 +180,10 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { doSetTaskId(pTaskInfo->pRoot); } -void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) { - SExecTaskInfo* pTaskInfo = tinfo; - pTaskInfo->code = code; -} +//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) { +// SExecTaskInfo* pTaskInfo = tinfo; +// pTaskInfo->code = code; +//} int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { if (tinfo == NULL) { @@ -1080,7 +1080,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT const char* id = GET_TASKID(pTaskInfo); // if pOffset equal to current offset, means continue consume - if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset) && pOffset->type != TMQ_OFFSET__SNAPSHOT_DATA) { + if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) { return 0; }