Merge pull request #18258 from taosdata/enh/msgRefactor1
enh: refact some messages
This commit is contained in:
commit
e2e52897a2
|
@ -1615,15 +1615,21 @@ typedef struct SSubQueryMsg {
|
|||
uint64_t taskId;
|
||||
int64_t refId;
|
||||
int32_t execId;
|
||||
int32_t msgMask;
|
||||
int8_t taskType;
|
||||
int8_t explain;
|
||||
int8_t needFetch;
|
||||
uint32_t sqlLen; // the query sql,
|
||||
uint32_t phyLen;
|
||||
int32_t msgMask;
|
||||
char msg[];
|
||||
uint32_t sqlLen;
|
||||
char *sql;
|
||||
uint32_t msgLen;
|
||||
char *msg;
|
||||
} SSubQueryMsg;
|
||||
|
||||
int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq);
|
||||
int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq);
|
||||
void tFreeSSubQueryMsg(SSubQueryMsg *pReq);
|
||||
|
||||
|
||||
typedef struct {
|
||||
SMsgHead header;
|
||||
uint64_t sId;
|
||||
|
@ -1732,6 +1738,13 @@ typedef struct {
|
|||
int32_t execId;
|
||||
} STaskDropReq;
|
||||
|
||||
int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq);
|
||||
int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq);
|
||||
|
||||
int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
|
||||
int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
|
||||
|
||||
|
||||
typedef struct {
|
||||
int32_t code;
|
||||
} STaskDropRsp;
|
||||
|
|
|
@ -4643,6 +4643,178 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) {
|
||||
int32_t headLen = sizeof(SMsgHead);
|
||||
if (buf != NULL) {
|
||||
buf = (char *)buf + headLen;
|
||||
bufLen -= headLen;
|
||||
}
|
||||
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
|
||||
if (tEncodeU64(&encoder, pReq->sId) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->refId) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->execId) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->msgMask) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->taskType) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->explain) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->needFetch) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1;
|
||||
if (tEncodeCStrWithLen(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pReq->msgLen) < 0) return -1;
|
||||
if (tEncodeBinary(&encoder, (uint8_t*)pReq->msg, pReq->msgLen) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
if (buf != NULL) {
|
||||
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
|
||||
pHead->vgId = htonl(pReq->header.vgId);
|
||||
pHead->contLen = htonl(tlen + headLen);
|
||||
}
|
||||
|
||||
return tlen + headLen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) {
|
||||
int32_t headLen = sizeof(SMsgHead);
|
||||
|
||||
SMsgHead *pHead = buf;
|
||||
pHead->vgId = pReq->header.vgId;
|
||||
pHead->contLen = pReq->header.contLen;
|
||||
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
|
||||
if (tDecodeU64(&decoder, &pReq->sId) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->refId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->msgMask) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->taskType) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->explain) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->needFetch) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1;
|
||||
if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pReq->msgLen) < 0) return -1;
|
||||
if (tDecodeBinaryAlloc(&decoder, (void**)&pReq->msg, NULL) < 0) return -1;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSSubQueryMsg(SSubQueryMsg *pReq) {
|
||||
if (NULL == pReq) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pReq->sql);
|
||||
taosMemoryFreeClear(pReq->msg);
|
||||
}
|
||||
|
||||
int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) {
|
||||
int32_t headLen = sizeof(SMsgHead);
|
||||
if (buf != NULL) {
|
||||
buf = (char *)buf + headLen;
|
||||
bufLen -= headLen;
|
||||
}
|
||||
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
|
||||
if (tEncodeU64(&encoder, pReq->sId) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->refId) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->execId) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
if (buf != NULL) {
|
||||
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
|
||||
pHead->vgId = htonl(pReq->header.vgId);
|
||||
pHead->contLen = htonl(tlen + headLen);
|
||||
}
|
||||
|
||||
return tlen + headLen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) {
|
||||
int32_t headLen = sizeof(SMsgHead);
|
||||
|
||||
SMsgHead *pHead = buf;
|
||||
pHead->vgId = pReq->header.vgId;
|
||||
pHead->contLen = pReq->header.contLen;
|
||||
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
|
||||
if (tDecodeU64(&decoder, &pReq->sId) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->refId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
|
||||
if (tEncodeI32(&encoder, pRsp->code) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pRsp->tbFName) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->sversion) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->tversion) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pRsp->affectedRows) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, (char *)buf, bufLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
|
||||
if (tDecodeI32(&decoder, &pRsp->code) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pRsp->tbFName) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->sversion) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->tversion) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pRsp->affectedRows) < 0) return -1;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) {
|
||||
int32_t headLen = sizeof(SMsgHead);
|
||||
|
|
|
@ -65,19 +65,37 @@ int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c
|
|||
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) {
|
||||
STbVerInfo *tbInfo = ctx ? &ctx->tbInfo : NULL;
|
||||
int64_t affectedRows = ctx ? ctx->affectedRows : 0;
|
||||
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
|
||||
pRsp->code = htonl(code);
|
||||
pRsp->affectedRows = htobe64(affectedRows);
|
||||
SQueryTableRsp rsp = {0};
|
||||
rsp.code = code;
|
||||
rsp.affectedRows = affectedRows;
|
||||
|
||||
if (tbInfo) {
|
||||
strcpy(pRsp->tbFName, tbInfo->tbFName);
|
||||
pRsp->sversion = htonl(tbInfo->sversion);
|
||||
pRsp->tversion = htonl(tbInfo->tversion);
|
||||
strcpy(rsp.tbFName, tbInfo->tbFName);
|
||||
rsp.sversion = tbInfo->sversion;
|
||||
rsp.tversion = tbInfo->tversion;
|
||||
}
|
||||
|
||||
int32_t msgSize = tSerializeSQueryTableRsp(NULL, 0, &rsp);
|
||||
if (msgSize < 0) {
|
||||
qError("tSerializeSQueryTableRsp failed");
|
||||
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
void *pRsp = rpcMallocCont(msgSize);
|
||||
if (NULL == pRsp) {
|
||||
qError("rpcMallocCont %d failed", msgSize);
|
||||
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
if (tSerializeSQueryTableRsp(pRsp, msgSize, &rsp) < 0) {
|
||||
qError("tSerializeSQueryTableRsp %d failed", msgSize);
|
||||
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.msgType = rspType,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(*pRsp),
|
||||
.contLen = msgSize,
|
||||
.code = code,
|
||||
.info = *pConn,
|
||||
};
|
||||
|
@ -182,23 +200,37 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
|
|||
#endif
|
||||
|
||||
int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
|
||||
STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
|
||||
if (NULL == req) {
|
||||
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
|
||||
STaskDropReq qMsg;
|
||||
qMsg.header.vgId = mgmt->nodeId;
|
||||
qMsg.header.contLen = 0;
|
||||
qMsg.sId = sId;
|
||||
qMsg.queryId = qId;
|
||||
qMsg.taskId = tId;
|
||||
qMsg.refId = rId;
|
||||
qMsg.execId = eId;
|
||||
|
||||
int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg);
|
||||
if (msgSize < 0) {
|
||||
QW_SCH_TASK_ELOG("tSerializeSTaskDropReq get size, msgSize:%d", msgSize);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
req->header.vgId = mgmt->nodeId;
|
||||
req->sId = sId;
|
||||
req->queryId = qId;
|
||||
req->taskId = tId;
|
||||
req->refId = rId;
|
||||
req->execId = eId;
|
||||
void *msg = rpcMallocCont(msgSize);
|
||||
if (NULL == msg) {
|
||||
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", msgSize);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
if (tSerializeSTaskDropReq(msg, msgSize, &qMsg) < 0) {
|
||||
QW_SCH_TASK_ELOG("tSerializeSTaskDropReq failed, msgSize:%d", msgSize);
|
||||
rpcFreeCont(msg);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SRpcMsg pNewMsg = {
|
||||
.msgType = TDMT_SCH_DROP_TASK,
|
||||
.pCont = req,
|
||||
.contLen = sizeof(STaskDropReq),
|
||||
.pCont = msg,
|
||||
.contLen = msgSize,
|
||||
.code = 0,
|
||||
.info = *pConn,
|
||||
};
|
||||
|
@ -247,22 +279,37 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
|
|||
}
|
||||
|
||||
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
|
||||
STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
|
||||
if (NULL == req) {
|
||||
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
|
||||
STaskDropReq qMsg;
|
||||
qMsg.header.vgId = mgmt->nodeId;
|
||||
qMsg.header.contLen = 0;
|
||||
qMsg.sId = sId;
|
||||
qMsg.queryId = qId;
|
||||
qMsg.taskId = tId;
|
||||
qMsg.refId = rId;
|
||||
qMsg.execId = eId;
|
||||
|
||||
int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg);
|
||||
if (msgSize < 0) {
|
||||
QW_SCH_TASK_ELOG("tSerializeSTaskDropReq get size, msgSize:%d", msgSize);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
req->header.vgId = htonl(mgmt->nodeId);
|
||||
req->sId = htobe64(sId);
|
||||
req->queryId = htobe64(qId);
|
||||
req->taskId = htobe64(tId);
|
||||
req->refId = htobe64(rId);
|
||||
void *msg = rpcMallocCont(msgSize);
|
||||
if (NULL == msg) {
|
||||
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", msgSize);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
if (tSerializeSTaskDropReq(msg, msgSize, &qMsg) < 0) {
|
||||
QW_SCH_TASK_ELOG("tSerializeSTaskDropReq failed, msgSize:%d", msgSize);
|
||||
rpcFreeCont(msg);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SRpcMsg brokenMsg = {
|
||||
.msgType = TDMT_SCH_DROP_TASK,
|
||||
.pCont = req,
|
||||
.contLen = sizeof(STaskDropReq),
|
||||
.pCont = msg,
|
||||
.contLen = msgSize,
|
||||
.code = TSDB_CODE_RPC_BROKEN_LINK,
|
||||
.info = *pConn,
|
||||
};
|
||||
|
@ -312,40 +359,33 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGran
|
|||
}
|
||||
|
||||
int32_t code = 0;
|
||||
SSubQueryMsg *msg = pMsg->pCont;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
|
||||
QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
SSubQueryMsg msg = {0};
|
||||
if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
|
||||
QW_ELOG("tDeserializeSSubQueryMsg failed, contLen:%d", pMsg->contLen);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
msg->sId = be64toh(msg->sId);
|
||||
msg->queryId = be64toh(msg->queryId);
|
||||
msg->taskId = be64toh(msg->taskId);
|
||||
msg->refId = be64toh(msg->refId);
|
||||
msg->execId = ntohl(msg->execId);
|
||||
msg->phyLen = ntohl(msg->phyLen);
|
||||
msg->sqlLen = ntohl(msg->sqlLen);
|
||||
msg->msgMask = ntohl(msg->msgMask);
|
||||
|
||||
if (chkGrant && (!TEST_SHOW_REWRITE_MASK(msg->msgMask)) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) {
|
||||
QW_ELOG("query failed cause of grant expired, msgMask:%d", msg->msgMask);
|
||||
if (chkGrant && (!TEST_SHOW_REWRITE_MASK(msg.msgMask)) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) {
|
||||
QW_ELOG("query failed cause of grant expired, msgMask:%d", msg.msgMask);
|
||||
tFreeSSubQueryMsg(&msg);
|
||||
QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
|
||||
}
|
||||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
uint64_t tId = msg->taskId;
|
||||
int64_t rId = msg->refId;
|
||||
int32_t eId = msg->execId;
|
||||
uint64_t sId = msg.sId;
|
||||
uint64_t qId = msg.queryId;
|
||||
uint64_t tId = msg.taskId;
|
||||
int64_t rId = msg.refId;
|
||||
int32_t eId = msg.execId;
|
||||
|
||||
SQWMsg qwMsg = {
|
||||
.msgType = pMsg->msgType, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
|
||||
.msgType = pMsg->msgType, .msg = msg.msg, .msgLen = msg.msgLen, .connInfo = pMsg->info};
|
||||
|
||||
QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle);
|
||||
QW_ERR_RET(qwPreprocessQuery(QW_FPARAMS(), &qwMsg));
|
||||
QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p", pMsg->info.handle);
|
||||
QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p, SQL:%s", pMsg->info.handle, msg.sql);
|
||||
code = qwPreprocessQuery(QW_FPARAMS(), &qwMsg);
|
||||
QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p, code:%x", pMsg->info.handle, code);
|
||||
|
||||
tFreeSSubQueryMsg(&msg);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -355,19 +395,25 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SSubQueryMsg *msg = pMsg->pCont;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SSubQueryMsg msg = {0};
|
||||
if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
|
||||
QW_ELOG("tDeserializeSSubQueryMsg failed, contLen:%d", pMsg->contLen);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
uint64_t tId = msg->taskId;
|
||||
int64_t rId = msg->refId;
|
||||
int32_t eId = msg->execId;
|
||||
uint64_t sId = msg.sId;
|
||||
uint64_t qId = msg.queryId;
|
||||
uint64_t tId = msg.taskId;
|
||||
int64_t rId = msg.refId;
|
||||
int32_t eId = msg.execId;
|
||||
|
||||
QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle);
|
||||
qwAbortPrerocessQuery(QW_FPARAMS());
|
||||
QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle);
|
||||
|
||||
tFreeSSubQueryMsg(&msg);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -377,42 +423,41 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
|||
}
|
||||
|
||||
int32_t code = 0;
|
||||
SSubQueryMsg *msg = pMsg->pCont;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
|
||||
QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1);
|
||||
|
||||
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
|
||||
QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
SSubQueryMsg msg = {0};
|
||||
if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
|
||||
QW_ELOG("tDeserializeSSubQueryMsg failed, contLen:%d", pMsg->contLen);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
uint64_t tId = msg->taskId;
|
||||
int64_t rId = msg->refId;
|
||||
int32_t eId = msg->execId;
|
||||
uint64_t sId = msg.sId;
|
||||
uint64_t qId = msg.queryId;
|
||||
uint64_t tId = msg.taskId;
|
||||
int64_t rId = msg.refId;
|
||||
int32_t eId = msg.execId;
|
||||
|
||||
SQWMsg qwMsg = {.node = node,
|
||||
.msg = msg->msg + msg->sqlLen,
|
||||
.msgLen = msg->phyLen,
|
||||
.msg = msg.msg,
|
||||
.msgLen = msg.msgLen,
|
||||
.connInfo = pMsg->info,
|
||||
.msgType = pMsg->msgType};
|
||||
qwMsg.msgInfo.explain = msg->explain;
|
||||
qwMsg.msgInfo.taskType = msg->taskType;
|
||||
qwMsg.msgInfo.needFetch = msg->needFetch;
|
||||
qwMsg.msgInfo.explain = msg.explain;
|
||||
qwMsg.msgInfo.taskType = msg.taskType;
|
||||
qwMsg.msgInfo.needFetch = msg.needFetch;
|
||||
|
||||
char *sql = strndup(msg->msg, msg->sqlLen);
|
||||
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType),
|
||||
pMsg->info.handle, sql);
|
||||
QW_ERR_JRET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql));
|
||||
pMsg->info.handle, msg.sql);
|
||||
code = qwProcessQuery(QW_FPARAMS(), &qwMsg, msg.sql);
|
||||
msg.sql = NULL;
|
||||
QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%x", node, code);
|
||||
|
||||
_return:
|
||||
tFreeSSubQueryMsg(&msg);
|
||||
|
||||
QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%d", node, code);
|
||||
|
||||
return code;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
|
||||
|
@ -548,28 +593,22 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
|
|||
}
|
||||
|
||||
int32_t code = 0;
|
||||
STaskDropReq *msg = pMsg->pCont;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1);
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
STaskDropReq msg = {0};
|
||||
if (tDeserializeSTaskDropReq(pMsg->pCont, pMsg->contLen, &msg) < 0) {
|
||||
QW_ELOG("tDeserializeSTaskDropReq failed, contLen:%d", pMsg->contLen);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
msg->sId = be64toh(msg->sId);
|
||||
msg->queryId = be64toh(msg->queryId);
|
||||
msg->taskId = be64toh(msg->taskId);
|
||||
msg->refId = be64toh(msg->refId);
|
||||
msg->execId = ntohl(msg->execId);
|
||||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
uint64_t tId = msg->taskId;
|
||||
int64_t rId = msg->refId;
|
||||
int32_t eId = msg->execId;
|
||||
uint64_t sId = msg.sId;
|
||||
uint64_t qId = msg.queryId;
|
||||
uint64_t tId = msg.taskId;
|
||||
int64_t rId = msg.refId;
|
||||
int32_t eId = msg.execId;
|
||||
|
||||
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
|
||||
|
||||
|
|
|
@ -114,7 +114,7 @@ void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
|
|||
qwtqueryMsg.queryId = htobe64(atomic_add_fetch_64(&qwtTestQueryId, 1));
|
||||
qwtqueryMsg.sId = htobe64(1);
|
||||
qwtqueryMsg.taskId = htobe64(1);
|
||||
qwtqueryMsg.phyLen = htonl(100);
|
||||
qwtqueryMsg.msgLen = htonl(100);
|
||||
qwtqueryMsg.sqlLen = 0;
|
||||
queryRpc->msgType = TDMT_SCH_QUERY;
|
||||
queryRpc->pCont = &qwtqueryMsg;
|
||||
|
@ -131,12 +131,29 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
|
|||
}
|
||||
|
||||
void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
|
||||
dropMsg->sId = htobe64(1);
|
||||
dropMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
|
||||
dropMsg->taskId = htobe64(1);
|
||||
dropMsg->sId = 1;
|
||||
dropMsg->queryId = atomic_load_64(&qwtTestQueryId);
|
||||
dropMsg->taskId = 1;
|
||||
|
||||
int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, dropMsg);
|
||||
if (msgSize < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
char *msg = (char*)taosMemoryCalloc(1, msgSize);
|
||||
if (NULL == msg) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (tSerializeSTaskDropReq(msg, msgSize, dropMsg) < 0) {
|
||||
taosMemoryFree(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
dropRpc->msgType = TDMT_SCH_DROP_TASK;
|
||||
dropRpc->pCont = dropMsg;
|
||||
dropRpc->contLen = sizeof(STaskDropReq);
|
||||
dropRpc->pCont = msg;
|
||||
dropRpc->contLen = msgSize;
|
||||
}
|
||||
|
||||
int32_t qwtStringToPlan(const char *str, SSubplan **subplan) {
|
||||
|
|
|
@ -334,17 +334,17 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
|
|||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
|
||||
rsp->code = ntohl(rsp->code);
|
||||
rsp->sversion = ntohl(rsp->sversion);
|
||||
rsp->tversion = ntohl(rsp->tversion);
|
||||
rsp->affectedRows = be64toh(rsp->affectedRows);
|
||||
SQueryTableRsp rsp = {0};
|
||||
if (tDeserializeSQueryTableRsp(msg, msgSize, &rsp) < 0) {
|
||||
SCH_TASK_ELOG("tDeserializeSQueryTableRsp failed, msgSize:%d", msgSize);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_MSG);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(rsp->code);
|
||||
SCH_ERR_JRET(rsp.code);
|
||||
|
||||
SCH_ERR_JRET(schSaveJobExecRes(pJob, rsp));
|
||||
SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp));
|
||||
|
||||
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
|
||||
atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows);
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
|
@ -1042,30 +1042,40 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
case TDMT_SCH_MERGE_QUERY: {
|
||||
SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
|
||||
|
||||
uint32_t len = strlen(pJob->sql);
|
||||
msgSize = sizeof(SSubQueryMsg) + pTask->msgLen + len;
|
||||
SSubQueryMsg qMsg;
|
||||
qMsg.header.vgId = addr->nodeId;
|
||||
qMsg.header.contLen = 0;
|
||||
qMsg.sId = schMgmt.sId;
|
||||
qMsg.queryId = pJob->queryId;
|
||||
qMsg.taskId = pTask->taskId;
|
||||
qMsg.refId = pJob->refId;
|
||||
qMsg.execId = pTask->execId;
|
||||
qMsg.msgMask = (pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0;
|
||||
qMsg.taskType = TASK_TYPE_TEMP;
|
||||
qMsg.explain = SCH_IS_EXPLAIN_JOB(pJob);
|
||||
qMsg.needFetch = SCH_TASK_NEED_FETCH(pTask);
|
||||
qMsg.sqlLen = strlen(pJob->sql);
|
||||
qMsg.sql = pJob->sql;
|
||||
qMsg.msgLen = pTask->msgLen;
|
||||
qMsg.msg = pTask->msg;
|
||||
|
||||
msgSize = tSerializeSSubQueryMsg(NULL, 0, &qMsg);
|
||||
if (msgSize < 0) {
|
||||
SCH_TASK_ELOG("tSerializeSSubQueryMsg get size, msgSize:%d", msgSize);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
msg = taosMemoryCalloc(1, msgSize);
|
||||
if (NULL == msg) {
|
||||
SCH_TASK_ELOG("calloc %d failed", msgSize);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SSubQueryMsg *pMsg = msg;
|
||||
pMsg->header.vgId = htonl(addr->nodeId);
|
||||
pMsg->sId = htobe64(schMgmt.sId);
|
||||
pMsg->queryId = htobe64(pJob->queryId);
|
||||
pMsg->taskId = htobe64(pTask->taskId);
|
||||
pMsg->refId = htobe64(pJob->refId);
|
||||
pMsg->execId = htonl(pTask->execId);
|
||||
pMsg->taskType = TASK_TYPE_TEMP;
|
||||
pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob);
|
||||
pMsg->needFetch = SCH_TASK_NEED_FETCH(pTask);
|
||||
pMsg->phyLen = htonl(pTask->msgLen);
|
||||
pMsg->sqlLen = htonl(len);
|
||||
pMsg->msgMask = htonl((pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0);
|
||||
|
||||
memcpy(pMsg->msg, pJob->sql, len);
|
||||
memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
|
||||
if (tSerializeSSubQueryMsg(msg, msgSize, &qMsg) < 0) {
|
||||
SCH_TASK_ELOG("tSerializeSSubQueryMsg failed, msgSize:%d", msgSize);
|
||||
taosMemoryFree(msg);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
persistHandle = true;
|
||||
SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle());
|
||||
|
@ -1092,22 +1102,32 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
break;
|
||||
}
|
||||
case TDMT_SCH_DROP_TASK: {
|
||||
msgSize = sizeof(STaskDropReq);
|
||||
STaskDropReq qMsg;
|
||||
qMsg.header.vgId = addr->nodeId;
|
||||
qMsg.header.contLen = 0;
|
||||
qMsg.sId = schMgmt.sId;
|
||||
qMsg.queryId = pJob->queryId;
|
||||
qMsg.taskId = pTask->taskId;
|
||||
qMsg.refId = pJob->refId;
|
||||
qMsg.execId = pTask->execId;
|
||||
|
||||
msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg);
|
||||
if (msgSize < 0) {
|
||||
SCH_TASK_ELOG("tSerializeSTaskDropReq get size, msgSize:%d", msgSize);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
msg = taosMemoryCalloc(1, msgSize);
|
||||
if (NULL == msg) {
|
||||
SCH_TASK_ELOG("calloc %d failed", msgSize);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
STaskDropReq *pMsg = msg;
|
||||
|
||||
pMsg->header.vgId = htonl(addr->nodeId);
|
||||
|
||||
pMsg->sId = htobe64(schMgmt.sId);
|
||||
pMsg->queryId = htobe64(pJob->queryId);
|
||||
pMsg->taskId = htobe64(pTask->taskId);
|
||||
pMsg->refId = htobe64(pJob->refId);
|
||||
pMsg->execId = htonl(pTask->execId);
|
||||
if (tSerializeSTaskDropReq(msg, msgSize, &qMsg) < 0) {
|
||||
SCH_TASK_ELOG("tSerializeSTaskDropReq failed, msgSize:%d", msgSize);
|
||||
taosMemoryFree(msg);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TDMT_SCH_QUERY_HEARTBEAT: {
|
||||
|
|
Loading…
Reference in New Issue