diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 5dfcb62fad..08123e6f4a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4723,6 +4723,84 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) { taosMemoryFreeClear(pReq->msg); } +int32_t tSerializeSTqOffsetVal(SEncoder *pEncoder, STqOffsetVal *pOffset) { + if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1; + if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1; + if (tEncodeI64(pEncoder, pOffset->ts) < 0) return -1; + + return 0; +} + +int32_t tDerializeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffset) { + if (tDecodeI8(pDecoder, &pOffset->type) < 0) return -1; + if (tDecodeI64(pDecoder, &pOffset->uid) < 0) return -1; + if (tDecodeI64(pDecoder, &pOffset->ts) < 0) return -1; + + return 0; +} + +int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + if (buf != NULL) { + buf = (char *)buf + headLen; + bufLen -= headLen; + } + + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeCStr(&encoder, pReq->subKey) < 0) return -1; + if (tEncodeI8(&encoder, pReq->withTbName) < 0) return -1; + if (tEncodeI8(&encoder, pReq->useSnapshot) < 0) return -1; + if (tEncodeI32(&encoder, pReq->epoch) < 0) return -1; + if (tEncodeU64(&encoder, pReq->reqId) < 0) return -1; + if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1; + if (tEncodeI64(&encoder, pReq->timeout) < 0) return -1; + if (tSerializeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + if (buf != NULL) { + SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen); + pHead->vgId = htonl(pReq->header.vgId); + pHead->contLen = htonl(tlen + headLen); + } + + return tlen + headLen; +} + +int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + + SMsgHead *pHead = buf; + pHead->vgId = pReq->header.vgId; + pHead->contLen = pReq->header.contLen; + + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeCStrTo(&decoder, pReq->subKey) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->withTbName) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->useSnapshot) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->reqId) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->timeout) < 0) return -1; + if (tDerializeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + + int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) {