refactor: do some internal refactor and add some logs.
This commit is contained in:
parent
abe37a16e3
commit
a2a7dffb5d
|
@ -641,17 +641,14 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
|
|||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
|
||||
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
|
||||
tscDebug("consumer:0x%" PRIx64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
|
||||
numOfVgroups);
|
||||
|
||||
for (int32_t j = 0; j < numOfVgroups; j++) {
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
|
||||
pVg->vgId);
|
||||
tscDebug("consumer:0x%" PRIx64 " begin commit for topic %s, vgId:%d, ordinal:%d/%d", tmq->consumerId, pTopic->topicName,
|
||||
pVg->vgId, j + 1, numOfVgroups);
|
||||
|
||||
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
|
||||
tscDebug("consumer: %" PRId64 ", vg:%d, current %" PRId64 ", committed %" PRId64 "", tmq->consumerId, pVg->vgId,
|
||||
tscDebug("consumer:0x%" PRId64 " vg:%d, current %" PRId64 ", committed %" PRId64 "", tmq->consumerId, pVg->vgId,
|
||||
pVg->currentOffset.version, pVg->committedOffset.version);
|
||||
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
|
||||
continue;
|
||||
|
@ -811,7 +808,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
|||
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pRefId = pTmq->refId;
|
||||
|
||||
tscDebug("consumer:0x%"PRIx64" retrieve ep from mnode in 1s", pTmq->consumerId);
|
||||
tscDebug("consumer:0x%"PRIx64" will retrieve ep from mnode in 1s", pTmq->consumerId);
|
||||
taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
|
||||
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
|
||||
tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
|
||||
|
@ -819,7 +816,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
|||
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pRefId = pTmq->refId;
|
||||
|
||||
tscDebug("consumer:0x%"PRIx64" commit to mnode in %.2f s", pTmq->consumerId, pTmq->autoCommitInterval/1000.0);
|
||||
tscDebug("consumer:0x%"PRIx64" will commit to mnode in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0);
|
||||
taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
|
||||
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
|
||||
// do nothing
|
||||
|
@ -1066,7 +1063,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
SCMSubscribeReq req = {0};
|
||||
int32_t code = 0;
|
||||
|
||||
tscDebug("consumer:0x%"PRIx64", tmq subscribe start, numOfTopic %d", tmq->consumerId, sz);
|
||||
tscDebug("consumer:0x%"PRIx64" tmq subscribe start, numOfTopic %d", tmq->consumerId, sz);
|
||||
|
||||
req.consumerId = tmq->consumerId;
|
||||
tstrncpy(req.clientId, tmq->clientId, 256);
|
||||
|
@ -1156,7 +1153,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
goto FAIL;
|
||||
}
|
||||
|
||||
tscDebug("consumer:0x%"PRIx64", mnd not ready for subscribe, retry count:%d in 500ms", tmq->consumerId, retryCnt);
|
||||
tscDebug("consumer:0x%"PRIx64", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
|
||||
taosMsleep(500);
|
||||
}
|
||||
|
||||
|
@ -1435,11 +1432,15 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
// Epoch will only increase when received newer epoch ep msg
|
||||
SMqRspHead* head = pMsg->pData;
|
||||
int32_t epoch = atomic_load_32(&tmq->epoch);
|
||||
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
|
||||
if (head->epoch <= epoch) {
|
||||
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
|
||||
tmq->consumerId, head->epoch, epoch);
|
||||
goto END;
|
||||
}
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
|
||||
head->epoch, epoch);
|
||||
|
||||
if (!async) {
|
||||
SMqAskEpRsp rsp;
|
||||
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
||||
|
@ -1548,7 +1549,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
|
|||
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
|
||||
|
||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||
tscDebug("consumer:0x%" PRIx64 ", ask ep from mnode", tmq->consumerId);
|
||||
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d", tmq->consumerId, async);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
|
|
|
@ -671,7 +671,7 @@ TEST(clientCase, generated_request_id_test) {
|
|||
uint64_t v = generateRequestId();
|
||||
void* result = taosHashGet(phash, &v, sizeof(v));
|
||||
if (result != nullptr) {
|
||||
printf("0x%lx, index:%d\n", v, i);
|
||||
// printf("0x%llx, index:%d\n", v, i);
|
||||
}
|
||||
assert(result == nullptr);
|
||||
taosHashPut(phash, &v, sizeof(v), NULL, 0);
|
||||
|
|
Loading…
Reference in New Issue