fix: add connection error handle

This commit is contained in:
Liu Jicong 2022-04-24 18:02:57 +08:00
parent 575f64197d
commit f4728e826e
2 changed files with 150 additions and 149 deletions

View File

@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h" #include "catalog.h"
#include "tdef.h"
#include "tname.h"
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "catalog.h" #include "os.h"
#include "query.h" #include "query.h"
#include "tdef.h"
#include "tname.h"
int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
@ -50,7 +50,13 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SConnectRsp connectRsp = {0}; SConnectRsp connectRsp = {0};
tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp); tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp);
assert(connectRsp.epSet.numOfEps > 0); /*assert(connectRsp.epSet.numOfEps > 0);*/
if (connectRsp.epSet.numOfEps == 0) {
taosMemoryFree(pMsg->pData);
setErrno(pRequest, TSDB_CODE_MND_APP_ERROR);
tsem_post(&pRequest->body.rspSem);
return code;
}
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet); updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
@ -82,18 +88,20 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return 0; return 0;
} }
SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
pMsgSendInfo->requestObjRefId = pRequest->self; pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->requestId = pRequest->requestId; pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->param = pRequest; pMsgSendInfo->param = pRequest;
pMsgSendInfo->msgType = pRequest->type; pMsgSendInfo->msgType = pRequest->type;
assert(pRequest != NULL); assert(pRequest != NULL);
pMsgSendInfo->msgInfo = pRequest->body.requestMsg; pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)]; pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)
? genericRspCallback
: handleRequestRspFp[TMSG_INDEX(pRequest->type)];
return pMsgSendInfo; return pMsgSendInfo;
} }
@ -114,7 +122,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if (TSDB_CODE_MND_DB_NOT_EXIST == code) { if (TSDB_CODE_MND_DB_NOT_EXIST == code) {
SUseDbRsp usedbRsp = {0}; SUseDbRsp usedbRsp = {0};
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp); tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
struct SCatalog *pCatalog = NULL; struct SCatalog* pCatalog = NULL;
if (usedbRsp.vgVersion >= 0) { if (usedbRsp.vgVersion >= 0) {
int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);

View File

@ -1,14 +1,13 @@
#include "qworker.h" #include "qworkerMsg.h"
#include "tcommon.h" #include "dataSinkMgt.h"
#include "executor.h" #include "executor.h"
#include "planner.h" #include "planner.h"
#include "query.h" #include "query.h"
#include "qworker.h"
#include "qworkerInt.h" #include "qworkerInt.h"
#include "qworkerMsg.h" #include "tcommon.h"
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
#include "dataSinkMgt.h"
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
int32_t msgSize = sizeof(SRetrieveTableRsp) + length; int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
@ -26,8 +25,6 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) { void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
@ -39,7 +36,6 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete)
rsp->numOfRows = htonl(input->numOfRows); rsp->numOfRows = htonl(input->numOfRows);
} }
void qwFreeFetchRsp(void *msg) { void qwFreeFetchRsp(void *msg) {
if (msg) { if (msg) {
rpcFreeCont(msg); rpcFreeCont(msg);
@ -50,16 +46,16 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) {
SQueryTableRsp rsp = {.code = code}; SQueryTableRsp rsp = {.code = code};
int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
void *msg = rpcMallocCont(contLen); void *msg = rpcMallocCont(contLen);
tSerializeSQueryTableRsp(msg, contLen, &rsp); tSerializeSQueryTableRsp(msg, contLen, &rsp);
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_RSP, .msgType = TDMT_VND_QUERY_RSP,
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.pCont = msg, .pCont = msg,
.contLen = contLen, .contLen = contLen,
.code = code, .code = code,
}; };
tmsgSendRsp(&rpcRsp); tmsgSendRsp(&rpcRsp);
@ -72,12 +68,12 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
pRsp->code = code; pRsp->code = code;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_RES_READY_RSP, .msgType = TDMT_VND_RES_READY_RSP,
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = NULL, .ahandle = NULL,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp), .contLen = sizeof(*pRsp),
.code = code, .code = code,
}; };
tmsgSendRsp(&rpcRsp); tmsgSendRsp(&rpcRsp);
@ -89,16 +85,16 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo,
SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo}; SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo};
int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp); int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(contLen); void *pRsp = rpcMallocCont(contLen);
tSerializeSExplainRsp(pRsp, contLen, &rsp); tSerializeSExplainRsp(pRsp, contLen, &rsp);
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_EXPLAIN_RSP, .msgType = TDMT_VND_EXPLAIN_RSP,
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.pCont = pRsp, .pCont = pRsp,
.contLen = contLen, .contLen = contLen,
.code = 0, .code = 0,
}; };
tmsgSendRsp(&rpcRsp); tmsgSendRsp(&rpcRsp);
@ -108,16 +104,16 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo,
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
void *pRsp = rpcMallocCont(contLen); void *pRsp = rpcMallocCont(contLen);
tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_HEARTBEAT_RSP, .msgType = TDMT_VND_QUERY_HEARTBEAT_RSP,
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.pCont = pRsp, .pCont = pRsp,
.contLen = contLen, .contLen = contLen,
.code = code, .code = code,
}; };
tmsgSendRsp(&rpcRsp); tmsgSendRsp(&rpcRsp);
@ -133,12 +129,12 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3
} }
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_FETCH_RSP, .msgType = TDMT_VND_FETCH_RSP,
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp) + dataLength, .contLen = sizeof(*pRsp) + dataLength,
.code = code, .code = code,
}; };
tmsgSendRsp(&rpcRsp); tmsgSendRsp(&rpcRsp);
@ -151,12 +147,12 @@ int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) {
pRsp->code = code; pRsp->code = code;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_CANCEL_TASK_RSP, .msgType = TDMT_VND_CANCEL_TASK_RSP,
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp), .contLen = sizeof(*pRsp),
.code = code, .code = code,
}; };
tmsgSendRsp(&rpcRsp); tmsgSendRsp(&rpcRsp);
@ -168,12 +164,12 @@ int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) {
pRsp->code = code; pRsp->code = code;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_DROP_TASK_RSP, .msgType = TDMT_VND_DROP_TASK_RSP,
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp), .contLen = sizeof(*pRsp),
.code = code, .code = code,
}; };
tmsgSendRsp(&rpcRsp); tmsgSendRsp(&rpcRsp);
@ -191,7 +187,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
return -1; return -1;
} }
col_id_t cols = 0; col_id_t cols = 0;
SSchema *pSchema = showRsp.tableMeta.pSchemas; SSchema *pSchema = showRsp.tableMeta.pSchemas;
const SSchema *s = tGetTbnameColumnSchema(); const SSchema *s = tGetTbnameColumnSchema();
@ -235,17 +231,17 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) { int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq *pFetchReq) {
SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp)); SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
int32_t handle = htonl(pFetchReq->id); int32_t handle = htonl(pFetchReq->id);
pRsp->numOfRows = 0; pRsp->numOfRows = 0;
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.handle = pMsg->handle, .handle = pMsg->handle,
.ahandle = pMsg->ahandle, .ahandle = pMsg->ahandle,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp), .contLen = sizeof(*pRsp),
.code = 0, .code = 0,
}; };
tmsgSendRsp(&rpcMsg); tmsgSendRsp(&rpcMsg);
@ -253,7 +249,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe
} }
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq)); SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
if (NULL == req) { if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq)); QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -265,12 +261,12 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
req->taskId = tId; req->taskId = tId;
SRpcMsg pNewMsg = { SRpcMsg pNewMsg = {
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE, .msgType = TDMT_VND_QUERY_CONTINUE,
.pCont = req, .pCont = req,
.contLen = sizeof(SQueryContinueReq), .contLen = sizeof(SQueryContinueReq),
.code = 0, .code = 0,
}; };
int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg); int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
@ -285,9 +281,8 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
STaskDropReq * req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq)); STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
if (NULL == req) { if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq)); QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -300,12 +295,12 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
req->refId = htobe64(rId); req->refId = htobe64(rId);
SRpcMsg pMsg = { SRpcMsg pMsg = {
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.msgType = TDMT_VND_DROP_TASK, .msgType = TDMT_VND_DROP_TASK,
.pCont = req, .pCont = req,
.contLen = sizeof(STaskDropReq), .contLen = sizeof(STaskDropReq),
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL, .code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
}; };
tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg); tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg);
@ -335,12 +330,12 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo
} }
SRpcMsg pMsg = { SRpcMsg pMsg = {
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.msgType = TDMT_VND_QUERY_HEARTBEAT, .msgType = TDMT_VND_QUERY_HEARTBEAT,
.pCont = msg, .pCont = msg,
.contLen = sizeof(SSchedulerHbReq), .contLen = msgSize,
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL, .code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
}; };
tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg); tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg);
@ -348,14 +343,12 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
int32_t code = 0; int32_t code = 0;
SSubQueryMsg *msg = pMsg->pCont; SSubQueryMsg *msg = pMsg->pCont;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
@ -364,12 +357,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
msg->sId = be64toh(msg->sId); msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId); msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId); msg->taskId = be64toh(msg->taskId);
msg->refId = be64toh(msg->refId); msg->refId = be64toh(msg->refId);
msg->phyLen = ntohl(msg->phyLen); msg->phyLen = ntohl(msg->phyLen);
msg->sqlLen = ntohl(msg->sqlLen); msg->sqlLen = ntohl(msg->sqlLen);
uint64_t sId = msg->sId; uint64_t sId = msg->sId;
uint64_t qId = msg->queryId; uint64_t qId = msg->queryId;
@ -380,7 +373,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle; qwMsg.connInfo.ahandle = pMsg->ahandle;
char* sql = strndup(msg->msg, msg->sqlLen); char *sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql); QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql);
taosMemoryFreeClear(sql); taosMemoryFreeClear(sql);
@ -392,13 +385,13 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
} }
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
int8_t status = 0; int8_t status = 0;
bool queryDone = false; bool queryDone = false;
SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont; SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
bool needStop = false; bool needStop = false;
SQWTaskCtx *handles = NULL; SQWTaskCtx *handles = NULL;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
@ -408,7 +401,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t sId = msg->sId; uint64_t sId = msg->sId;
uint64_t qId = msg->queryId; uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId; uint64_t tId = msg->taskId;
int64_t rId = 0; int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
@ -423,7 +416,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
@ -442,7 +435,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
uint64_t sId = msg->sId; uint64_t sId = msg->sId;
uint64_t qId = msg->queryId; uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId; uint64_t tId = msg->taskId;
int64_t rId = 0; int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
@ -462,7 +455,7 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
int32_t code = 0; int32_t code = 0;
SSchTasksStatusReq *msg = pMsg->pCont; SSchTasksStatusReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task status msg"); qError("invalid task status msg");
@ -475,11 +468,11 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SSchedulerStatusRsp *sStatus = NULL; SSchedulerStatusRsp *sStatus = NULL;
//QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus)); // QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
_return: _return:
//QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus)); // QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -504,7 +497,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t sId = msg->sId; uint64_t sId = msg->sId;
uint64_t qId = msg->queryId; uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId; uint64_t tId = msg->taskId;
int64_t rId = 0; int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
@ -529,8 +522,8 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
int32_t code = 0; int32_t code = 0;
STaskCancelReq *msg = pMsg->pCont; STaskCancelReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task cancel msg"); qError("invalid task cancel msg");
@ -551,7 +544,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle; qwMsg.connInfo.ahandle = pMsg->ahandle;
//QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId)); // QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
_return: _return:
@ -566,7 +559,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
int32_t code = 0; int32_t code = 0;
STaskDropReq *msg = pMsg->pCont; STaskDropReq *msg = pMsg->pCont;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
@ -607,9 +600,9 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
int32_t code = 0; int32_t code = 0;
SSchedulerHbReq req = {0}; SSchedulerHbReq req = {0};
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == pMsg->pCont) { if (NULL == pMsg->pCont) {
QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen); QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
@ -623,7 +616,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
} }
uint64_t sId = req.sId; uint64_t sId = req.sId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle; qwMsg.connInfo.ahandle = pMsg->ahandle;
@ -645,7 +638,7 @@ int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
int32_t code = 0; int32_t code = 0;
SVShowTablesReq *pReq = pMsg->pCont; SVShowTablesReq *pReq = pMsg->pCont;
QW_RET(qwBuildAndSendShowRsp(pMsg, code)); QW_RET(qwBuildAndSendShowRsp(pMsg, code));
} }