This commit is contained in:
Liu Jicong 2022-04-24 21:51:27 +08:00
parent e778e82498
commit df1ebbc25b
1 changed files with 150 additions and 138 deletions

View File

@ -1,14 +1,13 @@
#include "qworker.h"
#include "tcommon.h"
#include "qworkerMsg.h"
#include "dataSinkMgt.h"
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qworker.h"
#include "qworkerInt.h"
#include "qworkerMsg.h"
#include "tcommon.h"
#include "tmsg.h"
#include "tname.h"
#include "dataSinkMgt.h"
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
@ -26,8 +25,6 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
return TSDB_CODE_SUCCESS;
}
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
@ -39,7 +36,6 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete)
rsp->numOfRows = htonl(input->numOfRows);
}
void qwFreeFetchRsp(void *msg) {
if (msg) {
rpcFreeCont(msg);
@ -57,6 +53,7 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) {
.msgType = TDMT_VND_QUERY_RSP,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.refId = pConn->refId,
.pCont = msg,
.contLen = contLen,
.code = code,
@ -74,6 +71,7 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_RES_READY_RSP,
.handle = pConn->handle,
.refId = pConn->refId,
.ahandle = NULL,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
@ -96,6 +94,7 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo,
.msgType = TDMT_VND_EXPLAIN_RSP,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.refId = pConn->refId,
.pCont = pRsp,
.contLen = contLen,
.code = 0,
@ -115,6 +114,7 @@ int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_
.msgType = TDMT_VND_QUERY_HEARTBEAT_RSP,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.refId = pConn->refId,
.pCont = pRsp,
.contLen = contLen,
.code = code,
@ -136,6 +136,7 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3
.msgType = TDMT_VND_FETCH_RSP,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.refId = pConn->refId,
.pCont = pRsp,
.contLen = sizeof(*pRsp) + dataLength,
.code = code,
@ -154,6 +155,7 @@ int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) {
.msgType = TDMT_VND_CANCEL_TASK_RSP,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.refId = pConn->refId,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
@ -171,6 +173,7 @@ int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) {
.msgType = TDMT_VND_DROP_TASK_RSP,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.refId = pConn->refId,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
@ -226,6 +229,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rpcMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.refId = pMsg->refId,
.pCont = pBuf,
.contLen = bufLen,
.code = code,
@ -243,6 +247,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe
SRpcMsg rpcMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.refId = pMsg->refId,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = 0,
@ -268,6 +273,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE,
.refId = pConn->refId,
.pCont = req,
.contLen = sizeof(SQueryContinueReq),
.code = 0,
@ -285,7 +291,6 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
return TSDB_CODE_SUCCESS;
}
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
if (NULL == req) {
@ -302,6 +307,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
SRpcMsg pMsg = {
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.refId = pConn->refId,
.msgType = TDMT_VND_DROP_TASK,
.pCont = req,
.contLen = sizeof(STaskDropReq),
@ -337,6 +343,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo
SRpcMsg pMsg = {
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.refId = pConn->refId,
.msgType = TDMT_VND_QUERY_HEARTBEAT,
.pCont = msg,
.contLen = msgSize,
@ -348,8 +355,6 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -379,6 +384,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
char *sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql);
@ -413,6 +419,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->handle);
@ -447,6 +454,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->handle);
@ -509,6 +517,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->handle);
@ -550,6 +559,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
// QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
@ -588,6 +598,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
@ -626,6 +637,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));