shm
This commit is contained in:
parent
232404fa86
commit
d76a0c9b6d
|
@ -61,7 +61,7 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
|
||||||
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
|
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
|
||||||
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq);
|
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq);
|
||||||
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
|
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
|
||||||
void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp);
|
void tmsgSendRsp(const SRpcMsg* pRsp);
|
||||||
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg);
|
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg);
|
||||||
void tmsgReleaseHandle(void* handle, int8_t type);
|
void tmsgReleaseHandle(void* handle, int8_t type);
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) {
|
||||||
return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq);
|
return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); }
|
void tmsgSendRsp(const SRpcMsg* pRsp) { return (*tsDefaultMsgCb.sendRspFp)(tsDefaultMsgCb.pWrapper, pRsp); }
|
||||||
|
|
||||||
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
|
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
|
||||||
(*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg);
|
(*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg);
|
||||||
|
|
|
@ -141,7 +141,6 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg);
|
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg);
|
||||||
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
|
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
|
||||||
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type);
|
|
||||||
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper);
|
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper);
|
||||||
|
|
||||||
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
|
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
|
||||||
|
|
|
@ -122,8 +122,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t
|
||||||
ProcFuncType ftype) {
|
ProcFuncType ftype) {
|
||||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||||
pRpc->pCont = pCont;
|
pRpc->pCont = pCont;
|
||||||
dTrace("msg:%p, get from child queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle,
|
dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);
|
||||||
pRpc->ahandle);
|
|
||||||
|
|
||||||
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
|
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
|
||||||
int32_t code = (*msgFp)(pWrapper, pMsg);
|
int32_t code = (*msgFp)(pWrapper, pMsg);
|
||||||
|
@ -144,8 +143,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t
|
||||||
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
|
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
|
||||||
ProcFuncType ftype) {
|
ProcFuncType ftype) {
|
||||||
pMsg->pCont = pCont;
|
pMsg->pCont = pCont;
|
||||||
dTrace("msg:%p, get from parent queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pMsg->msgType), pMsg->handle,
|
dTrace("msg:%p, get from parent queue, handle:%p app:%p", pMsg, pMsg->handle, pMsg->ahandle);
|
||||||
pMsg->ahandle);
|
|
||||||
|
|
||||||
switch (ftype) {
|
switch (ftype) {
|
||||||
case PROC_REG:
|
case PROC_REG:
|
||||||
|
@ -155,6 +153,9 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
|
||||||
rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
|
rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
|
||||||
rpcFreeCont(pCont);
|
rpcFreeCont(pCont);
|
||||||
break;
|
break;
|
||||||
|
case PROC_REQ:
|
||||||
|
// todo send to dnode
|
||||||
|
dndSendReqToMnode(pWrapper, pMsg);
|
||||||
default:
|
default:
|
||||||
dndSendRpcRsp(pWrapper, pMsg);
|
dndSendRpcRsp(pWrapper, pMsg);
|
||||||
break;
|
break;
|
||||||
|
@ -220,15 +221,23 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
|
||||||
dndClearNodesExecpt(pDnode, n);
|
dndClearNodesExecpt(pDnode, n);
|
||||||
|
|
||||||
dInfo("node:%s, will be initialized in child process", pWrapper->name);
|
dInfo("node:%s, will be initialized in child process", pWrapper->name);
|
||||||
dndOpenNode(pWrapper);
|
if (dndOpenNode(pWrapper) != 0) {
|
||||||
|
dInfo("node:%s, failed to init in child process since %s", pWrapper->name, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosProcRun(pProc) != 0) {
|
||||||
|
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
break;
|
||||||
} else {
|
} else {
|
||||||
dInfo("node:%s, will not start in parent process, child pid:%d", pWrapper->name, taosProcChildId(pProc));
|
dInfo("node:%s, will not start in parent process, child pid:%d", pWrapper->name, taosProcChildId(pProc));
|
||||||
pWrapper->procType = PROC_PARENT;
|
pWrapper->procType = PROC_PARENT;
|
||||||
}
|
if (taosProcRun(pProc) != 0) {
|
||||||
|
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
|
||||||
if (taosProcRun(pProc) != 0) {
|
return -1;
|
||||||
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
|
}
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -320,8 +320,7 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *p
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
|
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
|
||||||
if (pWrapper->procType == PROC_CHILD) {
|
if (pWrapper->procType != PROC_CHILD) {
|
||||||
} else {
|
|
||||||
SDnode *pDnode = pWrapper->pDnode;
|
SDnode *pDnode = pWrapper->pDnode;
|
||||||
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
|
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
|
||||||
terrno = TSDB_CODE_DND_OFFLINE;
|
terrno = TSDB_CODE_DND_OFFLINE;
|
||||||
|
@ -329,23 +328,24 @@ int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return dndSendRpcReq(&pDnode->trans, pEpSet, pReq);
|
return dndSendRpcReq(&pDnode->trans, pEpSet, pReq);
|
||||||
|
} else {
|
||||||
|
while (taosProcPutToParentQ(pWrapper->pProc, pReq, sizeof(SRpcMsg), pReq->pCont, pReq->contLen, PROC_REQ) != 0) {
|
||||||
|
taosMsleep(1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
|
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
|
||||||
if (pWrapper->procType == PROC_CHILD) {
|
SDnode *pDnode = pWrapper->pDnode;
|
||||||
} else {
|
STransMgmt *pTrans = &pDnode->trans;
|
||||||
SDnode *pDnode = pWrapper->pDnode;
|
SEpSet epSet = {0};
|
||||||
STransMgmt *pTrans = &pDnode->trans;
|
|
||||||
SEpSet epSet = {0};
|
|
||||||
|
|
||||||
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
|
SMgmtWrapper *pWrapper2 = dndAcquireWrapper(pDnode, DNODE);
|
||||||
if (pWrapper != NULL) {
|
if (pWrapper2 != NULL) {
|
||||||
dmGetMnodeEpSet(pWrapper->pMgmt, &epSet);
|
dmGetMnodeEpSet(pWrapper2->pMgmt, &epSet);
|
||||||
dndReleaseWrapper(pWrapper);
|
dndReleaseWrapper(pWrapper2);
|
||||||
}
|
|
||||||
return dndSendRpcReq(pTrans, &epSet, pReq);
|
|
||||||
}
|
}
|
||||||
|
return dndSendRpcReq(pTrans, &epSet, pReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
|
void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
|
||||||
|
@ -382,7 +382,7 @@ void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) {
|
static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) {
|
||||||
if (pWrapper->procType != PROC_CHILD) {
|
if (pWrapper->procType != PROC_CHILD) {
|
||||||
rpcReleaseHandle(handle, type);
|
rpcReleaseHandle(handle, type);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -68,12 +68,13 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
SWrapperCfg wrapperCfg = {0};
|
SWrapperCfg wrapperCfg = {0};
|
||||||
vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg);
|
vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg);
|
||||||
|
|
||||||
|
#if 0
|
||||||
if (createReq.dnodeId != pMgmt->pDnode->dnodeId) {
|
if (createReq.dnodeId != pMgmt->pDnode->dnodeId) {
|
||||||
terrno = TSDB_CODE_DND_VNODE_INVALID_OPTION;
|
terrno = TSDB_CODE_DND_VNODE_INVALID_OPTION;
|
||||||
dDebug("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
|
dDebug("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, createReq.vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, createReq.vgId);
|
||||||
if (pVnode != NULL) {
|
if (pVnode != NULL) {
|
||||||
dDebug("vgId:%d, already exist", createReq.vgId);
|
dDebug("vgId:%d, already exist", createReq.vgId);
|
||||||
|
|
|
@ -771,7 +771,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
|
||||||
.ahandle = pTrans->rpcAHandle,
|
.ahandle = pTrans->rpcAHandle,
|
||||||
.pCont = rpcCont,
|
.pCont = rpcCont,
|
||||||
.contLen = pTrans->rpcRspLen};
|
.contLen = pTrans->rpcRspLen};
|
||||||
tmsgSendRsp(&pMnode->msgCb, &rspMsg);
|
tmsgSendRsp(&rspMsg);
|
||||||
pTrans->rpcHandle = NULL;
|
pTrans->rpcHandle = NULL;
|
||||||
pTrans->rpcRsp = NULL;
|
pTrans->rpcRsp = NULL;
|
||||||
pTrans->rpcRspLen = 0;
|
pTrans->rpcRspLen = 0;
|
||||||
|
@ -898,7 +898,7 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
|
||||||
pAction->msgReceived = 0;
|
pAction->msgReceived = 0;
|
||||||
pAction->errCode = 0;
|
pAction->errCode = 0;
|
||||||
} else {
|
} else {
|
||||||
mDebug("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
|
mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -938,7 +938,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
|
||||||
return errCode;
|
return errCode;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
mDebug("trans:%d, %d of %d actions executed, code:0x%04x", pTrans->id, numOfReceived, numOfActions, errCode & 0XFFFF);
|
mDebug("trans:%d, %d of %d actions executing", pTrans->id, numOfReceived, numOfActions);
|
||||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -275,7 +275,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pMsg->pCont = NULL;
|
pMsg->pCont = NULL;
|
||||||
pMsg->contLen = 0;
|
pMsg->contLen = 0;
|
||||||
pMsg->code = -1;
|
pMsg->code = -1;
|
||||||
rpcSendResponse(pMsg);
|
tmsgSendRsp(pMsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -340,7 +340,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pMsg->pCont = buf;
|
pMsg->pCont = buf;
|
||||||
pMsg->contLen = tlen;
|
pMsg->contLen = tlen;
|
||||||
pMsg->code = 0;
|
pMsg->code = 0;
|
||||||
rpcSendResponse(pMsg);
|
tmsgSendRsp(pMsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -367,7 +367,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pMsg->pCont = buf;
|
pMsg->pCont = buf;
|
||||||
pMsg->contLen = tlen;
|
pMsg->contLen = tlen;
|
||||||
pMsg->code = 0;
|
pMsg->code = 0;
|
||||||
rpcSendResponse(pMsg);
|
tmsgSendRsp(pMsg);
|
||||||
/*}*/
|
/*}*/
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -205,8 +205,7 @@ _exit:
|
||||||
rpcMsg.contLen = rspLen;
|
rpcMsg.contLen = rspLen;
|
||||||
rpcMsg.code = code;
|
rpcMsg.code = code;
|
||||||
|
|
||||||
rpcSendResponse(&rpcMsg);
|
tmsgSendRsp(&rpcMsg);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -276,8 +275,7 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
.code = 0,
|
.code = 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
rpcSendResponse(&rpcMsg);
|
tmsgSendRsp(&rpcMsg);
|
||||||
|
|
||||||
taosArrayDestroyEx(pArray, freeItemHelper);
|
taosArrayDestroyEx(pArray, freeItemHelper);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,18 +30,18 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||||
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||||
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req);
|
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req);
|
||||||
|
|
||||||
int32_t qwBuildAndSendDropRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code);
|
int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code);
|
||||||
int32_t qwBuildAndSendCancelRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code);
|
int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code);
|
||||||
int32_t qwBuildAndSendFetchRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
|
int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
|
||||||
int32_t code);
|
int32_t code);
|
||||||
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
|
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
|
||||||
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
|
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
|
||||||
int32_t qwBuildAndSendReadyRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code);
|
int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code);
|
||||||
int32_t qwBuildAndSendQueryRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code);
|
int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code);
|
||||||
void qwFreeFetchRsp(void *msg);
|
void qwFreeFetchRsp(void *msg);
|
||||||
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
|
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
|
||||||
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
|
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
|
||||||
int32_t qwBuildAndSendHbRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
|
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
|
||||||
int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
|
int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -764,7 +764,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
|
||||||
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
||||||
dropConnection = NULL;
|
dropConnection = NULL;
|
||||||
|
|
||||||
qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code);
|
qwBuildAndSendDropRsp(&ctx->connInfo, code);
|
||||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
|
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
|
||||||
|
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
|
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
|
||||||
|
@ -802,7 +802,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
|
||||||
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
||||||
dropConnection = NULL;
|
dropConnection = NULL;
|
||||||
|
|
||||||
qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code);
|
qwBuildAndSendDropRsp(&ctx->connInfo, code);
|
||||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
|
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
|
||||||
|
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
|
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
|
||||||
|
@ -830,12 +830,12 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dropConnection) {
|
if (dropConnection) {
|
||||||
qwBuildAndSendDropRsp(&mgmt->msgCb, dropConnection, code);
|
qwBuildAndSendDropRsp(dropConnection, code);
|
||||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code));
|
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cancelConnection) {
|
if (cancelConnection) {
|
||||||
qwBuildAndSendCancelRsp(&mgmt->msgCb, cancelConnection, code);
|
qwBuildAndSendCancelRsp(cancelConnection, code);
|
||||||
QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
|
QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -886,7 +886,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code);
|
qwBuildAndSendDropRsp(&ctx->connInfo, code);
|
||||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
|
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
|
||||||
|
|
||||||
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
||||||
|
@ -918,7 +918,7 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
if (readyConnection) {
|
if (readyConnection) {
|
||||||
qwBuildAndSendReadyRsp(&mgmt->msgCb, readyConnection, code);
|
qwBuildAndSendReadyRsp(readyConnection, code);
|
||||||
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
|
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -970,7 +970,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
QW_ERR_JRET(qwBuildAndSendQueryRsp(&mgmt->msgCb, &qwMsg->connInfo, code));
|
QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
|
||||||
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||||
|
|
||||||
queryRsped = true;
|
queryRsped = true;
|
||||||
|
@ -988,7 +988,7 @@ _return:
|
||||||
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
|
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
|
||||||
|
|
||||||
if (!queryRsped) {
|
if (!queryRsped) {
|
||||||
qwBuildAndSendQueryRsp(&mgmt->msgCb, &qwMsg->connInfo, code);
|
qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
|
||||||
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1051,7 +1051,7 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
if (needRsp) {
|
if (needRsp) {
|
||||||
qwBuildAndSendReadyRsp(&mgmt->msgCb, &qwMsg->connInfo, code);
|
qwBuildAndSendReadyRsp(&qwMsg->connInfo, code);
|
||||||
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1095,7 +1095,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
qwMsg->connInfo = ctx->connInfo;
|
qwMsg->connInfo = ctx->connInfo;
|
||||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||||
|
|
||||||
qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, dataLen, code);
|
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
|
||||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
|
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
|
||||||
} else {
|
} else {
|
||||||
atomic_store_8((int8_t*)&ctx->queryContinue, 1);
|
atomic_store_8((int8_t*)&ctx->queryContinue, 1);
|
||||||
|
@ -1114,7 +1114,7 @@ _return:
|
||||||
rsp = NULL;
|
rsp = NULL;
|
||||||
|
|
||||||
qwMsg->connInfo = ctx->connInfo;
|
qwMsg->connInfo = ctx->connInfo;
|
||||||
qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, 0, code);
|
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code);
|
||||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0);
|
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1195,7 +1195,7 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code || rsp) {
|
if (code || rsp) {
|
||||||
qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, dataLen, code);
|
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
|
||||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
|
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1226,7 +1226,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
|
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
|
||||||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
|
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
|
||||||
} else if (ctx->phase > 0) {
|
} else if (ctx->phase > 0) {
|
||||||
qwBuildAndSendDropRsp(&mgmt->msgCb, &qwMsg->connInfo, code);
|
qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
|
||||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||||
|
|
||||||
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
||||||
|
@ -1261,7 +1261,7 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
qwBuildAndSendDropRsp(&mgmt->msgCb, &qwMsg->connInfo, code);
|
qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
|
||||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1297,7 +1297,7 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
qwBuildAndSendHbRsp(&mgmt->msgCb, &qwMsg->connInfo, &rsp, code);
|
qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
|
||||||
QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||||
|
|
||||||
QW_RET(TSDB_CODE_SUCCESS);
|
QW_RET(TSDB_CODE_SUCCESS);
|
||||||
|
@ -1351,7 +1351,7 @@ _return:
|
||||||
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
||||||
|
|
||||||
for (int32_t j = 0; j < i; ++j) {
|
for (int32_t j = 0; j < i; ++j) {
|
||||||
qwBuildAndSendHbRsp(&mgmt->msgCb, &rspList[j].connInfo, &rspList[j].rsp, code);
|
qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
|
||||||
QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),
|
QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),
|
||||||
(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));
|
(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));
|
||||||
tFreeSSchedulerHbRsp(&rspList[j].rsp);
|
tFreeSSchedulerHbRsp(&rspList[j].rsp);
|
||||||
|
|
|
@ -46,7 +46,7 @@ void qwFreeFetchRsp(void *msg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwBuildAndSendQueryRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) {
|
int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) {
|
||||||
SQueryTableRsp rsp = {.code = code};
|
SQueryTableRsp rsp = {.code = code};
|
||||||
|
|
||||||
int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
|
int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
|
||||||
|
@ -62,12 +62,12 @@ int32_t qwBuildAndSendQueryRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t
|
||||||
.code = code,
|
.code = code,
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgSendRsp(pMsgCb, &rpcRsp);
|
tmsgSendRsp(&rpcRsp);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwBuildAndSendReadyRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) {
|
int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
|
||||||
SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
|
SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
|
||||||
pRsp->code = code;
|
pRsp->code = code;
|
||||||
|
|
||||||
|
@ -80,12 +80,12 @@ int32_t qwBuildAndSendReadyRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t
|
||||||
.code = code,
|
.code = code,
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgSendRsp(pMsgCb, &rpcRsp);
|
tmsgSendRsp(&rpcRsp);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwBuildAndSendHbRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
|
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
|
||||||
int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
|
int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
|
||||||
void *pRsp = rpcMallocCont(contLen);
|
void *pRsp = rpcMallocCont(contLen);
|
||||||
tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
|
tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
|
||||||
|
@ -99,12 +99,12 @@ int32_t qwBuildAndSendHbRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SScheduler
|
||||||
.code = code,
|
.code = code,
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgSendRsp(pMsgCb, &rpcRsp);
|
tmsgSendRsp(&rpcRsp);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwBuildAndSendFetchRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
|
int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
|
||||||
if (NULL == pRsp) {
|
if (NULL == pRsp) {
|
||||||
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||||
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
|
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
|
||||||
|
@ -120,12 +120,12 @@ int32_t qwBuildAndSendFetchRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SRetrie
|
||||||
.code = code,
|
.code = code,
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgSendRsp(pMsgCb, &rpcRsp);
|
tmsgSendRsp(&rpcRsp);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwBuildAndSendCancelRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) {
|
int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) {
|
||||||
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
|
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
|
||||||
pRsp->code = code;
|
pRsp->code = code;
|
||||||
|
|
||||||
|
@ -138,11 +138,11 @@ int32_t qwBuildAndSendCancelRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_
|
||||||
.code = code,
|
.code = code,
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgSendRsp(pMsgCb, &rpcRsp);
|
tmsgSendRsp(&rpcRsp);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwBuildAndSendDropRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) {
|
int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) {
|
||||||
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
|
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
|
||||||
pRsp->code = code;
|
pRsp->code = code;
|
||||||
|
|
||||||
|
@ -155,11 +155,11 @@ int32_t qwBuildAndSendDropRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t
|
||||||
.code = code,
|
.code = code,
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgSendRsp(pMsgCb, &rpcRsp);
|
tmsgSendRsp(&rpcRsp);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwBuildAndSendShowRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, int32_t code) {
|
int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
|
||||||
int32_t numOfCols = 6;
|
int32_t numOfCols = 6;
|
||||||
SVShowTablesRsp showRsp = {0};
|
SVShowTablesRsp showRsp = {0};
|
||||||
|
|
||||||
|
@ -210,11 +210,11 @@ int32_t qwBuildAndSendShowRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, int32_t code)
|
||||||
.code = code,
|
.code = code,
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgSendRsp(pMsgCb, &rpcMsg);
|
tmsgSendRsp(&rpcMsg);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwBuildAndSendShowFetchRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) {
|
int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) {
|
||||||
SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
|
SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
|
||||||
int32_t handle = htonl(pFetchReq->id);
|
int32_t handle = htonl(pFetchReq->id);
|
||||||
|
|
||||||
|
@ -227,7 +227,7 @@ int32_t qwBuildAndSendShowFetchRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, SVShowTa
|
||||||
.code = 0,
|
.code = 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgSendRsp(pMsgCb, &rpcMsg);
|
tmsgSendRsp(&rpcMsg);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -498,7 +498,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
QW_ERR_RET(qwBuildAndSendCancelRsp(&mgmt->msgCb, &qwMsg.connInfo, code));
|
QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
|
||||||
QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
|
QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -584,11 +584,9 @@ int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
SQWorkerMgmt *pMgmt = qWorkerMgmt;
|
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SVShowTablesReq *pReq = pMsg->pCont;
|
SVShowTablesReq *pReq = pMsg->pCont;
|
||||||
QW_RET(qwBuildAndSendShowRsp(&pMgmt->msgCb, pMsg, code));
|
QW_RET(qwBuildAndSendShowRsp(pMsg, code));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||||
|
@ -596,8 +594,6 @@ int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg)
|
||||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
SQWorkerMgmt *pMgmt = qWorkerMgmt;
|
|
||||||
|
|
||||||
SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
|
SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
|
||||||
QW_RET(qwBuildAndSendShowFetchRsp(&pMgmt->msgCb, pMsg, pFetchReq));
|
QW_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue