From a13225d990a7f6713af793e744f4f80dc330a6e2 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 25 Jan 2022 16:48:27 +0800 Subject: [PATCH] fix qcomplete issue --- source/libs/qworker/inc/qworkerMsg.h | 2 +- source/libs/qworker/src/qworker.c | 6 ++++-- source/libs/qworker/src/qworkerMsg.c | 4 ++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index 7735e1a1ee..51f55d238f 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -32,7 +32,7 @@ int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t int32_t qwBuildAndSendDropRsp(void *connection, int32_t code); int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code); int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); -void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len); +void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection); int32_t qwBuildAndSendSchSinkMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection); int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 8ffa6cd60d..c28e30333c 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1116,7 +1116,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (rsp) { - qwBuildFetchRsp(rsp, &sOutput, dataLen); + bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); + qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); @@ -1196,7 +1197,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (NULL == rsp) { QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); } else { - qwBuildFetchRsp(rsp, &sOutput, dataLen); + bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); + qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); } if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 0fd3dcb072..56c882f404 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -26,11 +26,11 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { return TSDB_CODE_SUCCESS; } -void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len) { +void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; rsp->useconds = htobe64(input->useconds); - rsp->completed = input->queryEnd; + rsp->completed = qComplete; rsp->precision = input->precision; rsp->compressed = input->compressed; rsp->compLen = htonl(len);