fix:[TD-31017]process return value in vnode for tmq
This commit is contained in:
parent
3f2278ebcf
commit
e53c8a35a4
|
@ -336,7 +336,7 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
.pCont = pHandle->msg->pCont,
|
.pCont = pHandle->msg->pCont,
|
||||||
.contLen = pHandle->msg->contLen,
|
.contLen = pHandle->msg->contLen,
|
||||||
.info = pHandle->msg->info};
|
.info = pHandle->msg->info};
|
||||||
tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
|
(void)tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
|
||||||
taosMemoryFree(pHandle->msg);
|
taosMemoryFree(pHandle->msg);
|
||||||
pHandle->msg = NULL;
|
pHandle->msg = NULL;
|
||||||
}
|
}
|
||||||
|
@ -451,8 +451,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code);
|
tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
|
||||||
return terrno;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void* buf = rpcMallocCont(len);
|
void* buf = rpcMallocCont(len);
|
||||||
|
@ -462,8 +461,12 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
tEncoderInit(&encoder, buf, len);
|
tEncoderInit(&encoder, buf, len);
|
||||||
tEncodeMqVgOffset(&encoder, &vgOffset);
|
code = tEncodeMqVgOffset(&encoder, &vgOffset);
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
if (code < 0) {
|
||||||
|
rpcFreeCont(buf);
|
||||||
|
return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
|
||||||
|
}
|
||||||
|
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
|
||||||
|
|
||||||
|
|
|
@ -415,7 +415,7 @@ static int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
|
||||||
END:
|
END:
|
||||||
tdbFree(pKey);
|
tdbFree(pKey);
|
||||||
tdbFree(pVal);
|
tdbFree(pVal);
|
||||||
tdbTbcClose(pCur);
|
(void)tdbTbcClose(pCur);
|
||||||
tDeleteSTqCheckInfo(&info);
|
tDeleteSTqCheckInfo(&info);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue