fix(tmq): fix tmq syntax error.

This commit is contained in:
Haojun Liao 2023-07-11 12:05:49 +08:00
parent 3ffdbe923d
commit 27eabb3da9
1 changed files with 11 additions and 11 deletions

View File

@ -2030,6 +2030,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->emptyBlockReceiveTs = taosGetTimestampMs(); pVg->emptyBlockReceiveTs = taosGetTimestampMs();
pRspWrapper = tmqFreeRspWrapper(pRspWrapper); pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
taosWUnLockLatch(&tmq->lock);
continue;
} else { } else {
pVg->emptyBlockReceiveTs = 0; // reset the ts pVg->emptyBlockReceiveTs = 0; // reset the ts
} }
@ -2043,20 +2045,18 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows); pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
} }
tmq->totalRows += numOfRows; tmq->totalRows += numOfRows;
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset);
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
tmq->totalRows, pollRspWrapper->reqId); tmq->totalRows, pollRspWrapper->reqId);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
taosWUnLockLatch(&tmq->lock);
return pRsp;
}
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
return pRsp;
} else { } else {
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);