From ce1b6147a0c33e84f9b550ebc171f422baf829f6 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 20 Oct 2022 14:18:39 +0800 Subject: [PATCH] enh(client): add debug log --- source/client/src/clientTmq.c | 20 ++++++++++++++++---- source/libs/wal/src/walRead.c | 13 +++++++------ 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index e279c1fef5..0b7b80649e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -979,6 +979,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { const SArray* container = &topic_list->container; int32_t sz = taosArrayGetSize(container); void* buf = NULL; + SMsgSendInfo* sendInfo = NULL; SCMSubscribeReq req = {0}; int32_t code = -1; @@ -1016,7 +1017,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { void* abuf = buf; tSerializeSCMSubscribeReq(&abuf, &req); - SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) goto FAIL; SMqSubscribeCbParam param = { @@ -1046,6 +1047,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { // avoid double free if msg is sent buf = NULL; + sendInfo = NULL; tsem_wait(¶m.rspSem); tsem_destroy(¶m.rspSem); @@ -1078,8 +1080,9 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { code = 0; FAIL: - if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree); + taosArrayDestroyP(req.topicNames, taosMemoryFree); taosMemoryFree(buf); + taosMemoryFree(sendInfo); return code; } @@ -1613,7 +1616,11 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (rspWrapper == NULL) { taosReadAllQitems(tmq->mqueue, tmq->qall); taosGetQitem(tmq->qall, (void**)&rspWrapper); - if (rspWrapper == NULL) return NULL; + + if (rspWrapper == NULL) { + tscDebug("consumer %" PRId64 " mqueue empty", tmq->consumerId); + return NULL; + } } if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { @@ -1711,6 +1718,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { void* rspObj; int64_t startTime = taosGetTimestampMs(); + tscDebug("consumer:%" PRId64 ", start poll at %" PRId64, tmq->consumerId, startTime); + #if 0 tmqHandleAllDelayedTask(tmq); tmqPollImpl(tmq, timeout); @@ -1745,15 +1754,18 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { rspObj = tmqHandleAllRsp(tmq, timeout, false); if (rspObj) { + tscDebug("consumer:%" PRId64 ", return rsp", tmq->consumerId); return (TAOS_RES*)rspObj; } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { + tscDebug("consumer:%" PRId64 ", return null since no committed offset", tmq->consumerId); return NULL; } if (timeout != -1) { int64_t endTime = taosGetTimestampMs(); int64_t leftTime = endTime - startTime; if (leftTime > timeout) { - tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch); + tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", end time %" PRId64, + tmq->consumerId, tmq->epoch, startTime, endTime); return NULL; } tsem_timewait(&tmq->rspSem, leftTime * 1000); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 60c8591823..8f493ddd85 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -278,12 +278,12 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { wDebug("vgId:%d, wal starts to fetch body, index:%" PRId64, pRead->pWal->cfg.vgId, ver); if (pRead->capacity < pReadHead->bodyLen) { - void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen); + SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen); if (ptr == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return -1; } - pRead->pHead = (SWalCkHead *)ptr; + pRead->pHead = ptr; pReadHead = &pRead->pHead->head; pRead->capacity = pReadHead->bodyLen; } @@ -398,12 +398,12 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { int64_t ver = pReadHead->version; if (pRead->capacity < pReadHead->bodyLen) { - void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen); + SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen); if (ptr == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return -1; } - *ppHead = (SWalCkHead *)ptr; + *ppHead = ptr; pReadHead = &((*ppHead)->head); pRead->capacity = pReadHead->bodyLen; } @@ -493,13 +493,14 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { } if (pReader->capacity < pReader->pHead->head.bodyLen) { - void *ptr = taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen); + SWalCkHead *ptr = + (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen); if (ptr == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; taosThreadMutexUnlock(&pReader->mutex); return -1; } - pReader->pHead = (SWalCkHead *)ptr; + pReader->pHead = ptr; pReader->capacity = pReader->pHead->head.bodyLen; }