Merge pull request #14415 from taosdata/feature/stream
feat(wal): add api to check log existance
This commit is contained in:
commit
98d2f42640
|
@ -210,6 +210,8 @@ void walCloseRef(SWalRef *);
|
|||
int32_t walRefVer(SWalRef *, int64_t ver);
|
||||
int32_t walUnrefVer(SWal *);
|
||||
|
||||
bool walLogExist(SWal *, int64_t ver);
|
||||
|
||||
// lifecycle check
|
||||
bool walIsEmpty(SWal *);
|
||||
int64_t walGetFirstVer(SWal *);
|
||||
|
|
|
@ -1609,8 +1609,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
|||
int64_t transporterId = 0;
|
||||
/*printf("send poll\n");*/
|
||||
|
||||
char offsetFormatBuf[50];
|
||||
tFormatOffset(offsetFormatBuf, 50, &pVg->currentOffsetNew);
|
||||
char offsetFormatBuf[80];
|
||||
tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew);
|
||||
tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %s, reqId %lu", tmq->consumerId,
|
||||
pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
|
||||
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
|
||||
|
|
|
@ -276,7 +276,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
tqDebug("tmq poll: consumer %ld, offset reset to %s", consumerId, formatBuf);
|
||||
} else {
|
||||
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
||||
if (pReq->useSnapshot) {
|
||||
if (pReq->useSnapshot && pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
if (!pHandle->fetchMeta) {
|
||||
tqOffsetResetToData(&fetchOffsetNew, 0, 0);
|
||||
} else {
|
||||
|
@ -375,10 +375,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
|
||||
taosMemoryFree(pHeadWithCkSum);
|
||||
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
// 1. set uid and ts
|
||||
// 2. get data (rebuild reader if needed)
|
||||
// 3. get new uid and ts
|
||||
|
||||
tqInfo("retrieve using snapshot req offset: uid %ld ts %ld", dataRsp.reqOffset.uid, dataRsp.reqOffset.ts);
|
||||
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
|
||||
ASSERT(0);
|
||||
|
|
|
@ -19,6 +19,10 @@
|
|||
#include "tref.h"
|
||||
#include "walInt.h"
|
||||
|
||||
bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
|
||||
return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver;
|
||||
}
|
||||
|
||||
bool FORCE_INLINE walIsEmpty(SWal* pWal) { return pWal->vers.firstVer == -1; }
|
||||
|
||||
int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
|
||||
|
|
|
@ -36,7 +36,11 @@
|
|||
#define MAX_CONSUMER_THREAD_CNT (16)
|
||||
#define MAX_VGROUP_CNT (32)
|
||||
|
||||
typedef enum { NOTIFY_CMD_START_CONSUM, NOTIFY_CMD_START_COMMIT, NOTIFY_CMD_ID_BUTT } NOTIFY_CMD_ID;
|
||||
typedef enum {
|
||||
NOTIFY_CMD_START_CONSUM,
|
||||
NOTIFY_CMD_START_COMMIT,
|
||||
NOTIFY_CMD_ID_BUTT,
|
||||
} NOTIFY_CMD_ID;
|
||||
|
||||
typedef struct {
|
||||
TdThread thread;
|
||||
|
|
Loading…
Reference in New Issue