feat(wal): support filter
This commit is contained in:
parent
1ec57798d2
commit
4a648d711f
|
@ -32,6 +32,18 @@ enum {
|
||||||
TMQ_CONF__RESET_OFFSET__LATEST = -1,
|
TMQ_CONF__RESET_OFFSET__LATEST = -1,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// clang-format off
|
||||||
|
#define IS_META_MSG(x) ( \
|
||||||
|
x == TDMT_VND_CREATE_STB \
|
||||||
|
|| x == TDMT_VND_ALTER_STB \
|
||||||
|
|| x == TDMT_VND_DROP_STB \
|
||||||
|
|| x == TDMT_VND_CREATE_TABLE \
|
||||||
|
|| x == TDMT_VND_ALTER_TABLE \
|
||||||
|
|| x == TDMT_VND_DROP_TABLE \
|
||||||
|
|| x == TDMT_VND_DROP_TTL_TABLE \
|
||||||
|
)
|
||||||
|
// clang-format on
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TMQ_MSG_TYPE__DUMMY = 0,
|
TMQ_MSG_TYPE__DUMMY = 0,
|
||||||
TMQ_MSG_TYPE__POLL_RSP,
|
TMQ_MSG_TYPE__POLL_RSP,
|
||||||
|
|
|
@ -194,7 +194,7 @@ int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
|
||||||
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
|
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
|
||||||
void walCloseReader(SWalReader *pRead);
|
void walCloseReader(SWalReader *pRead);
|
||||||
int32_t walReadVer(SWalReader *pRead, int64_t ver);
|
int32_t walReadVer(SWalReader *pRead, int64_t ver);
|
||||||
int32_t walNextValidMsg(SWalReader *pRead, SWalCkHead **ppHead);
|
int32_t walNextValidMsg(SWalReader *pRead);
|
||||||
|
|
||||||
// only for tq usage
|
// only for tq usage
|
||||||
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
|
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
|
||||||
|
|
|
@ -40,15 +40,6 @@ extern "C" {
|
||||||
#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
|
#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
|
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
|
|
||||||
#define IS_META_MSG(x) ( \
|
|
||||||
x == TDMT_VND_CREATE_STB \
|
|
||||||
|| x == TDMT_VND_ALTER_STB \
|
|
||||||
|| x == TDMT_VND_DROP_STB \
|
|
||||||
|| x == TDMT_VND_CREATE_TABLE \
|
|
||||||
|| x == TDMT_VND_ALTER_TABLE \
|
|
||||||
|| x == TDMT_VND_DROP_TABLE \
|
|
||||||
|| x == TDMT_VND_DROP_TTL_TABLE \
|
|
||||||
)
|
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
typedef struct STqOffsetStore STqOffsetStore;
|
typedef struct STqOffsetStore STqOffsetStore;
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
#include "tcoding.h"
|
#include "tcoding.h"
|
||||||
|
#include "tcommon.h"
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,10 @@
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "walInt.h"
|
#include "walInt.h"
|
||||||
|
|
||||||
|
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer);
|
||||||
|
static int32_t walFetchBodyNew(SWalReader *pRead);
|
||||||
|
static int32_t walSkipFetchBodyNew(SWalReader *pRead);
|
||||||
|
|
||||||
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
||||||
SWalReader *pRead = taosMemoryMalloc(sizeof(SWalReader));
|
SWalReader *pRead = taosMemoryMalloc(sizeof(SWalReader));
|
||||||
if (pRead == NULL) {
|
if (pRead == NULL) {
|
||||||
|
@ -29,7 +33,12 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
||||||
pRead->curVersion = -1;
|
pRead->curVersion = -1;
|
||||||
pRead->curFileFirstVer = -1;
|
pRead->curFileFirstVer = -1;
|
||||||
pRead->capacity = 0;
|
pRead->capacity = 0;
|
||||||
pRead->cond = *cond;
|
if (cond)
|
||||||
|
pRead->cond = *cond;
|
||||||
|
else {
|
||||||
|
pRead->cond.scanMeta = 0;
|
||||||
|
pRead->cond.scanUncommited = 0;
|
||||||
|
}
|
||||||
|
|
||||||
taosThreadMutexInit(&pRead->mutex, NULL);
|
taosThreadMutexInit(&pRead->mutex, NULL);
|
||||||
|
|
||||||
|
@ -39,6 +48,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
||||||
taosMemoryFree(pRead);
|
taosMemoryFree(pRead);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pRead;
|
return pRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,10 +59,28 @@ void walCloseReader(SWalReader *pRead) {
|
||||||
taosMemoryFree(pRead);
|
taosMemoryFree(pRead);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walNextValidMsg(SWalReader *pRead, SWalCkHead **ppHead) {
|
int32_t walNextValidMsg(SWalReader *pRead) {
|
||||||
//
|
int64_t fetchVer = pRead->curVersion;
|
||||||
|
int64_t endVer = pRead->cond.scanUncommited ? walGetLastVer(pRead->pWal) : walGetCommittedVer(pRead->pWal);
|
||||||
return 0;
|
while (fetchVer <= endVer) {
|
||||||
|
if (walFetchHeadNew(pRead, fetchVer) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (pRead->pHead->head.msgType == TDMT_VND_SUBMIT ||
|
||||||
|
(IS_META_MSG(pRead->pHead->head.msgType) && pRead->cond.scanMeta)) {
|
||||||
|
if (walFetchBodyNew(pRead) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
if (walSkipFetchBodyNew(pRead) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
fetchVer++;
|
||||||
|
ASSERT(fetchVer == pRead->curVersion);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64_t ver) {
|
static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64_t ver) {
|
||||||
|
@ -156,6 +184,91 @@ static int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
|
||||||
|
|
||||||
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
|
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
|
||||||
|
|
||||||
|
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
||||||
|
int64_t contLen;
|
||||||
|
if (pRead->curVersion != fetchVer) {
|
||||||
|
if (walReadSeekVer(pRead, fetchVer) < 0) return -1;
|
||||||
|
}
|
||||||
|
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
||||||
|
if (contLen != sizeof(SWalCkHead)) {
|
||||||
|
if (contLen < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
}
|
||||||
|
pRead->curVersion = -1;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t walFetchBodyNew(SWalReader *pRead) {
|
||||||
|
SWalCont *pReadHead = &pRead->pHead->head;
|
||||||
|
int64_t ver = pReadHead->version;
|
||||||
|
|
||||||
|
if (pRead->capacity < pReadHead->bodyLen) {
|
||||||
|
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
||||||
|
if (ptr == NULL) {
|
||||||
|
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pRead->pHead = ptr;
|
||||||
|
pReadHead = &pRead->pHead->head;
|
||||||
|
pRead->capacity = pReadHead->bodyLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
|
||||||
|
if (pReadHead->bodyLen < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 ", since %s",
|
||||||
|
pRead->pHead->head.version, ver, tstrerror(terrno));
|
||||||
|
} else {
|
||||||
|
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 ", since file corrupted",
|
||||||
|
pRead->pHead->head.version, ver);
|
||||||
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
}
|
||||||
|
pRead->curVersion = -1;
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pReadHead->version != ver) {
|
||||||
|
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver);
|
||||||
|
pRead->curVersion = -1;
|
||||||
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (walValidBodyCksum(pRead->pHead) != 0) {
|
||||||
|
wError("wal fetch body error: % " PRId64 ", since body checksum not passed", ver);
|
||||||
|
pRead->curVersion = -1;
|
||||||
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRead->curVersion = ver + 1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
|
||||||
|
int64_t code;
|
||||||
|
|
||||||
|
ASSERT(pRead->curVersion == pRead->pHead->head.version);
|
||||||
|
|
||||||
|
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
|
||||||
|
if (code < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
pRead->curVersion = -1;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRead->curVersion++;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
||||||
int64_t code;
|
int64_t code;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue