fix(tmq): add some logs.
This commit is contained in:
parent
a24d537d9e
commit
2517f9bb83
|
@ -580,7 +580,10 @@ static int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, t
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
taosThreadMutexLock(&tmq->lock);
|
taosThreadMutexLock(&tmq->lock);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
|
||||||
|
|
||||||
|
tscDebug("consumer:0x%" PRIx64 " user invoked commit offset for %d", tmq->consumerId, numOfTopics);
|
||||||
|
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
if (strcmp(pTopic->topicName, topic) != 0) {
|
if (strcmp(pTopic->topicName, topic) != 0) {
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -296,51 +296,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// int32_t len = 0;
|
|
||||||
// int32_t code = 0;
|
|
||||||
//
|
|
||||||
// if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
|
||||||
// tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
|
||||||
// } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
|
||||||
// tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (code < 0) {
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// int32_t tlen = sizeof(SMqRspHead) + len;
|
|
||||||
// void* buf = rpcMallocCont(tlen);
|
|
||||||
// if (buf == NULL) {
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// ((SMqRspHead*)buf)->mqMsgType = type;
|
|
||||||
// ((SMqRspHead*)buf)->epoch = pReq->epoch;
|
|
||||||
// ((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
|
||||||
//
|
|
||||||
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
|
||||||
//
|
|
||||||
// SEncoder encoder = {0};
|
|
||||||
// tEncoderInit(&encoder, abuf, len);
|
|
||||||
//
|
|
||||||
// if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
|
||||||
// tEncodeSMqDataRsp(&encoder, pRsp);
|
|
||||||
// } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
|
||||||
// tEncodeSTaosxRsp(&encoder, (STaosxRsp*) pRsp);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// tEncoderClear(&encoder);
|
|
||||||
//
|
|
||||||
// SRpcMsg rsp = {
|
|
||||||
// .info = pMsg->info,
|
|
||||||
// .pCont = buf,
|
|
||||||
// .contLen = tlen,
|
|
||||||
// .code = 0,
|
|
||||||
// };
|
|
||||||
//
|
|
||||||
// tmsgSendRsp(&rsp);
|
|
||||||
doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type);
|
doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type);
|
||||||
|
|
||||||
char buf1[80] = {0};
|
char buf1[80] = {0};
|
||||||
|
@ -348,8 +303,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
||||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||||
|
|
||||||
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s",
|
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%"PRIx64,
|
||||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue