log(tmq): add some logs.

This commit is contained in:
Haojun Liao 2023-05-17 11:37:38 +08:00
parent 573a86ed99
commit 3896e148fd
1 changed files with 7 additions and 0 deletions

View File

@ -319,11 +319,15 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen)
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
tqError("vgId:%d failed to decode seek msg", vgId);
return -1;
}
tDecoderClear(&decoder);
tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);
STqOffset* pOffset = &vgOffset.offset;
if (pOffset->val.type != TMQ_OFFSET__LOG) {
tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
@ -385,6 +389,9 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen)
return -1;
}
tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
vgOffset.consumerId, vgOffset.offset.val.version);
return 0;
}