diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index fc3f97ec88..941deb03cd 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1048,6 +1048,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); atomic_store_8((int8_t*)&ctx->queryEnd, qComplete); + + qwMsg->connInfo = ctx->connInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); @@ -1068,6 +1070,8 @@ _return: QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); qwFreeFetchRsp(rsp); rsp = NULL; + + qwMsg->connInfo = ctx->connInfo; qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0); } @@ -1107,6 +1111,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (NULL == rsp) { atomic_store_ptr(&ctx->connInfo.handle, qwMsg->connInfo.handle); atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle); + QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); } else { bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);