enh: refact fetch message
This commit is contained in:
parent
422735d3f1
commit
cafc5e4cae
|
@ -1629,7 +1629,6 @@ int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq);
|
|||
int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq);
|
||||
void tFreeSSubQueryMsg(SSubQueryMsg *pReq);
|
||||
|
||||
|
||||
typedef struct {
|
||||
SMsgHead header;
|
||||
uint64_t sId;
|
||||
|
@ -1667,6 +1666,10 @@ typedef struct {
|
|||
int32_t execId;
|
||||
} SResFetchReq;
|
||||
|
||||
int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq);
|
||||
int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq);
|
||||
|
||||
|
||||
typedef struct {
|
||||
SMsgHead header;
|
||||
uint64_t sId;
|
||||
|
|
|
@ -4723,6 +4723,61 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) {
|
|||
taosMemoryFreeClear(pReq->msg);
|
||||
}
|
||||
|
||||
|
||||
int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) {
|
||||
int32_t headLen = sizeof(SMsgHead);
|
||||
if (buf != NULL) {
|
||||
buf = (char *)buf + headLen;
|
||||
bufLen -= headLen;
|
||||
}
|
||||
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
|
||||
if (tEncodeU64(&encoder, pReq->sId) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->execId) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
if (buf != NULL) {
|
||||
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
|
||||
pHead->vgId = htonl(pReq->header.vgId);
|
||||
pHead->contLen = htonl(tlen + headLen);
|
||||
}
|
||||
|
||||
return tlen + headLen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) {
|
||||
int32_t headLen = sizeof(SMsgHead);
|
||||
|
||||
SMsgHead *pHead = buf;
|
||||
pHead->vgId = pReq->header.vgId;
|
||||
pHead->contLen = pReq->header.contLen;
|
||||
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
|
||||
if (tDecodeU64(&decoder, &pReq->sId) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t tSerializeSTqOffsetVal(SEncoder *pEncoder, STqOffsetVal *pOffset) {
|
||||
if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1;
|
||||
|
|
|
@ -403,27 +403,42 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
|
|||
loadRemoteDataCallback(pWrapper, &pBuf, code);
|
||||
taosMemoryFree(pWrapper);
|
||||
} else {
|
||||
SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
|
||||
if (NULL == pMsg) {
|
||||
SResFetchReq req = {0};
|
||||
req.header.vgId = pSource->addr.nodeId;
|
||||
req.sId = pSource->schedId;
|
||||
req.taskId = pSource->taskId;
|
||||
req.queryId = pTaskInfo->id.queryId;
|
||||
req.execId = pSource->execId;
|
||||
|
||||
int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req);
|
||||
if (msgSize < 0) {
|
||||
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pWrapper);
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
|
||||
void* msg = taosMemoryCalloc(1, msgSize);
|
||||
if (NULL == msg) {
|
||||
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pWrapper);
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
|
||||
if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) {
|
||||
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pWrapper);
|
||||
taosMemoryFree(msg);
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
|
||||
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId,
|
||||
pSource->execId, sourceIndex, totalSources);
|
||||
|
||||
pMsg->header.vgId = htonl(pSource->addr.nodeId);
|
||||
pMsg->sId = htobe64(pSource->schedId);
|
||||
pMsg->taskId = htobe64(pSource->taskId);
|
||||
pMsg->queryId = htobe64(pTaskInfo->id.queryId);
|
||||
pMsg->execId = htonl(pSource->execId);
|
||||
|
||||
// send the fetch remote task result reques
|
||||
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (NULL == pMsgSendInfo) {
|
||||
taosMemoryFreeClear(pMsg);
|
||||
taosMemoryFreeClear(msg);
|
||||
taosMemoryFree(pWrapper);
|
||||
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
|
||||
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
|
@ -432,8 +447,8 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
|
|||
|
||||
pMsgSendInfo->param = pWrapper;
|
||||
pMsgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
pMsgSendInfo->msgInfo.pData = pMsg;
|
||||
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
|
||||
pMsgSendInfo->msgInfo.pData = msg;
|
||||
pMsgSendInfo->msgInfo.len = msgSize;
|
||||
pMsgSendInfo->msgType = pSource->fetchMsgType;
|
||||
pMsgSendInfo->fp = loadRemoteDataCallback;
|
||||
|
||||
|
|
|
@ -499,27 +499,22 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
|||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SResFetchReq *msg = pMsg->pCont;
|
||||
SResFetchReq req = {0};
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1);
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
if (tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
||||
QW_ELOG("tDeserializeSResFetchReq %d failed", pMsg->contLen);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
msg->sId = be64toh(msg->sId);
|
||||
msg->queryId = be64toh(msg->queryId);
|
||||
msg->taskId = be64toh(msg->taskId);
|
||||
msg->execId = ntohl(msg->execId);
|
||||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
uint64_t tId = msg->taskId;
|
||||
uint64_t sId = req.sId;
|
||||
uint64_t qId = req.queryId;
|
||||
uint64_t tId = req.taskId;
|
||||
int64_t rId = 0;
|
||||
int32_t eId = msg->execId;
|
||||
int32_t eId = req.execId;
|
||||
|
||||
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType};
|
||||
|
||||
|
|
|
@ -1083,22 +1083,29 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
}
|
||||
case TDMT_SCH_FETCH:
|
||||
case TDMT_SCH_MERGE_FETCH: {
|
||||
msgSize = sizeof(SResFetchReq);
|
||||
SResFetchReq req = {0};
|
||||
req.header.vgId = addr->nodeId;
|
||||
req.sId = schMgmt.sId;
|
||||
req.queryId = pJob->queryId;
|
||||
req.taskId = pTask->taskId;
|
||||
req.execId = pTask->execId;
|
||||
|
||||
msgSize = tSerializeSResFetchReq(NULL, 0, &req);
|
||||
if (msgSize < 0) {
|
||||
SCH_TASK_ELOG("tSerializeSResFetchReq get size, msgSize:%d", msgSize);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
msg = taosMemoryCalloc(1, msgSize);
|
||||
if (NULL == msg) {
|
||||
SCH_TASK_ELOG("calloc %d failed", msgSize);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SResFetchReq *pMsg = msg;
|
||||
|
||||
pMsg->header.vgId = htonl(addr->nodeId);
|
||||
|
||||
pMsg->sId = htobe64(schMgmt.sId);
|
||||
pMsg->queryId = htobe64(pJob->queryId);
|
||||
pMsg->taskId = htobe64(pTask->taskId);
|
||||
pMsg->execId = htonl(pTask->execId);
|
||||
|
||||
if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) {
|
||||
SCH_TASK_ELOG("tSerializeSResFetchReq %d failed", msgSize);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TDMT_SCH_DROP_TASK: {
|
||||
|
|
Loading…
Reference in New Issue