From 7a65589525d7ff10446292492e0a07e13fb8a82b Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 1 Jul 2022 10:15:05 +0800 Subject: [PATCH] feat(wal): add api to check log existance --- include/libs/wal/wal.h | 2 ++ source/client/src/tmq.c | 4 ++-- source/dnode/vnode/src/tq/tq.c | 6 +----- source/libs/wal/src/walMeta.c | 4 ++++ tests/test/c/tmqSim.c | 6 +++++- 5 files changed, 14 insertions(+), 8 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 92701db2ad..eb5db9d639 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -210,6 +210,8 @@ void walCloseRef(SWalRef *); int32_t walRefVer(SWalRef *, int64_t ver); int32_t walUnrefVer(SWal *); +bool walLogExist(SWal *, int64_t ver); + // lifecycle check bool walIsEmpty(SWal *); int64_t walGetFirstVer(SWal *); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index bea9d215da..efc1e3a082 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1609,8 +1609,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { int64_t transporterId = 0; /*printf("send poll\n");*/ - char offsetFormatBuf[50]; - tFormatOffset(offsetFormatBuf, 50, &pVg->currentOffsetNew); + char offsetFormatBuf[80]; + tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew); tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %s, reqId %lu", tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId); /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d80996b399..8ef66d3ef6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -276,7 +276,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { tqDebug("tmq poll: consumer %ld, offset reset to %s", consumerId, formatBuf); } else { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { - if (pReq->useSnapshot) { + if (pReq->useSnapshot && pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (!pHandle->fetchMeta) { tqOffsetResetToData(&fetchOffsetNew, 0, 0); } else { @@ -375,10 +375,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { taosMemoryFree(pHeadWithCkSum); } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) { - // 1. set uid and ts - // 2. get data (rebuild reader if needed) - // 3. get new uid and ts - tqInfo("retrieve using snapshot req offset: uid %ld ts %ld", dataRsp.reqOffset.uid, dataRsp.reqOffset.ts); if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) { ASSERT(0); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 4150fe6d1b..342ab7b152 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -19,6 +19,10 @@ #include "tref.h" #include "walInt.h" +bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) { + return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver; +} + bool FORCE_INLINE walIsEmpty(SWal* pWal) { return pWal->vers.firstVer == -1; } int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; } diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 07fdcc2415..5cf38f1e5b 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -36,7 +36,11 @@ #define MAX_CONSUMER_THREAD_CNT (16) #define MAX_VGROUP_CNT (32) -typedef enum { NOTIFY_CMD_START_CONSUM, NOTIFY_CMD_START_COMMIT, NOTIFY_CMD_ID_BUTT } NOTIFY_CMD_ID; +typedef enum { + NOTIFY_CMD_START_CONSUM, + NOTIFY_CMD_START_COMMIT, + NOTIFY_CMD_ID_BUTT, +} NOTIFY_CMD_ID; typedef struct { TdThread thread;