fix: remove rpc response msg in delete

This commit is contained in:
dapan1121 2022-06-28 16:35:27 +08:00
parent d3e741a84e
commit 2b9aff693b
5 changed files with 11 additions and 21 deletions

View File

@ -36,6 +36,7 @@ typedef struct SDeleteRes {
SArray* uidList; SArray* uidList;
int64_t skey; int64_t skey;
int64_t ekey; int64_t ekey;
int64_t affectedRows;
} SDeleteRes; } SDeleteRes;
typedef struct SQWorkerCfg { typedef struct SQWorkerCfg {
@ -82,7 +83,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *pRsp, SDeleteRes *pRes); int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes);
void qWorkerDestroy(void **qWorkerMgmt); void qWorkerDestroy(void **qWorkerMgmt);

View File

@ -288,7 +288,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRp
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_DELETE: case TDMT_VND_DELETE:
return qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, pRsp, &res); return qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
default: default:
vError("unknown msg type:%d in write queue", pMsg->msgType); vError("unknown msg type:%d in write queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; return TSDB_CODE_VND_APP_ERROR;

View File

@ -31,7 +31,7 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req);
int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes *pRes); int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes);
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code);

View File

@ -536,8 +536,8 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
} }
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *pRsp, SDeleteRes *pRes) { int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == pRsp) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
@ -558,7 +558,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SR
QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql); QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);
taosMemoryFreeClear(req.sql); taosMemoryFreeClear(req.sql);
QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRsp, pRes)); QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRes));
QW_SCH_TASK_DLOG("processDelete end, node:%p", node); QW_SCH_TASK_DLOG("processDelete end, node:%p", node);

View File

@ -242,9 +242,8 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SDeleteRes *pRes) { int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) {
int32_t len = 0; int32_t len = 0;
SVDeleteRsp rsp = {0};
bool queryEnd = false; bool queryEnd = false;
int32_t code = 0; int32_t code = 0;
SOutputData output = {0}; SOutputData output = {0};
@ -270,21 +269,11 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen
SDeleterRes* pDelRes = (SDeleterRes*)output.pData; SDeleterRes* pDelRes = (SDeleterRes*)output.pData;
rsp.affectedRows = pDelRes->affectedRows;
pRes->suid = pDelRes->suid; pRes->suid = pDelRes->suid;
pRes->uidList = pDelRes->uidList; pRes->uidList = pDelRes->uidList;
pRes->skey = pDelRes->skey; pRes->skey = pDelRes->skey;
pRes->ekey = pDelRes->ekey; pRes->ekey = pDelRes->ekey;
pRes->affectedRows = pDelRes->affectedRows;
SEncoder coder = {0};
tEncodeSize(tEncodeSVDeleteRsp, &rsp, len, code);
void *msg = rpcMallocCont(len);
tEncoderInit(&coder, msg, len);
tEncodeSVDeleteRsp(&coder, &rsp);
tEncoderClear(&coder);
*rspMsg = msg;
*dataLen = len;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -931,7 +920,7 @@ _return:
qwRelease(refId); qwRelease(refId);
} }
int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes *pRes) { int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
int32_t code = 0; int32_t code = 0;
SSubplan *plan = NULL; SSubplan *plan = NULL;
qTaskInfo_t pTaskInfo = NULL; qTaskInfo_t pTaskInfo = NULL;
@ -963,7 +952,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx, NULL)); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx, NULL));
QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, &pRsp->contLen, &pRsp->pCont, pRes)); QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, pRes));
_return: _return: