diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 36e9b3309c..0c769e7811 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -36,6 +36,7 @@ typedef struct SDeleteRes { SArray* uidList; int64_t skey; int64_t ekey; + int64_t affectedRows; } SDeleteRes; typedef struct SQWorkerCfg { @@ -82,7 +83,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); -int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *pRsp, SDeleteRes *pRes); +int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes); void qWorkerDestroy(void **qWorkerMgmt); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 2ba3d37a5c..4ac1030269 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -288,7 +288,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRp switch (pMsg->msgType) { case TDMT_VND_DELETE: - return qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, pRsp, &res); + return qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); default: vError("unknown msg type:%d in write queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index 8c7c030dce..9b2b67013d 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -31,7 +31,7 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); -int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes *pRes); +int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes); int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 82a62b5c5a..78c094aac7 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -536,8 +536,8 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ } -int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *pRsp, SDeleteRes *pRes) { - if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == pRsp) { +int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -558,7 +558,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SR QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql); taosMemoryFreeClear(req.sql); - QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRsp, pRes)); + QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRes)); QW_SCH_TASK_DLOG("processDelete end, node:%p", node); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 4250785f7c..eb40b00d14 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -242,9 +242,8 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, return TSDB_CODE_SUCCESS; } -int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SDeleteRes *pRes) { +int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) { int32_t len = 0; - SVDeleteRsp rsp = {0}; bool queryEnd = false; int32_t code = 0; SOutputData output = {0}; @@ -270,21 +269,11 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen SDeleterRes* pDelRes = (SDeleterRes*)output.pData; - rsp.affectedRows = pDelRes->affectedRows; pRes->suid = pDelRes->suid; pRes->uidList = pDelRes->uidList; pRes->skey = pDelRes->skey; pRes->ekey = pDelRes->ekey; - - SEncoder coder = {0}; - tEncodeSize(tEncodeSVDeleteRsp, &rsp, len, code); - void *msg = rpcMallocCont(len); - tEncoderInit(&coder, msg, len); - tEncodeSVDeleteRsp(&coder, &rsp); - tEncoderClear(&coder); - - *rspMsg = msg; - *dataLen = len; + pRes->affectedRows = pDelRes->affectedRows; return TSDB_CODE_SUCCESS; } @@ -931,7 +920,7 @@ _return: qwRelease(refId); } -int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes *pRes) { +int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { int32_t code = 0; SSubplan *plan = NULL; qTaskInfo_t pTaskInfo = NULL; @@ -963,7 +952,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx, NULL)); - QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, &pRsp->contLen, &pRsp->pCont, pRes)); + QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, pRes)); _return: