more work
This commit is contained in:
parent
39626c1bc6
commit
bcc01d630b
|
@ -219,6 +219,15 @@ int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockSta
|
|||
// SDelFReader
|
||||
|
||||
// tsdbUtil.c ==============================================================================================
|
||||
int32_t tsdbRealloc(uint8_t **ppBuf, int64_t size);
|
||||
void tsdbFree(uint8_t *pBuf);
|
||||
|
||||
// STMap
|
||||
typedef struct STMap STMap;
|
||||
|
||||
int32_t tPutTMap(uint8_t *p, STMap *pMap);
|
||||
int32_t tGetTMap(uint8_t *p, STMap *pMap);
|
||||
|
||||
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
|
||||
int32_t tsdbKeyCmprFn(const void *p1, const void *p2);
|
||||
|
||||
|
@ -244,8 +253,6 @@ struct STsdb {
|
|||
STsdbFS *fs;
|
||||
};
|
||||
|
||||
#if 1 // ======================================
|
||||
|
||||
struct STable {
|
||||
uint64_t suid;
|
||||
uint64_t uid;
|
||||
|
@ -771,29 +778,23 @@ typedef struct {
|
|||
} SDelDataItem;
|
||||
|
||||
struct SDelData {
|
||||
uint32_t delimiter;
|
||||
tb_uid_t suid;
|
||||
tb_uid_t uid;
|
||||
int8_t flags;
|
||||
int64_t nItem;
|
||||
uint8_t *pOffset;
|
||||
uint32_t nData;
|
||||
uint8_t *pData;
|
||||
int64_t version;
|
||||
TSKEY sKey;
|
||||
TSKEY eKey;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
struct SDelIdx {
|
||||
tb_uid_t suid;
|
||||
tb_uid_t uid;
|
||||
TSKEY minKey;
|
||||
TSKEY maxKey;
|
||||
int64_t maxVersion;
|
||||
int64_t minVersion;
|
||||
int64_t maxVersion;
|
||||
int64_t offset;
|
||||
int64_t size;
|
||||
} SDelIdxItem;
|
||||
};
|
||||
|
||||
struct SDelIdx {
|
||||
uint32_t delimiter;
|
||||
struct STMap {
|
||||
uint8_t flags;
|
||||
uint32_t nOffset;
|
||||
uint8_t *pOffset;
|
||||
|
@ -801,7 +802,14 @@ struct SDelIdx {
|
|||
uint8_t *pData;
|
||||
};
|
||||
|
||||
#endif
|
||||
struct SDelFile {
|
||||
TSKEY minKey;
|
||||
TSKEY maxKey;
|
||||
int64_t minVersion;
|
||||
int64_t maxVersion;
|
||||
int64_t size;
|
||||
int64_t offset;
|
||||
};
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -42,6 +42,8 @@ struct SCommitter {
|
|||
SDelFWriter *pDelFWriter;
|
||||
SDelIdx oDelIdx;
|
||||
SDelIdx nDelIdx;
|
||||
SDelData oDelData;
|
||||
SDelData nDelData;
|
||||
/* commit cache */
|
||||
};
|
||||
|
||||
|
@ -248,6 +250,21 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
|
|||
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
|
||||
int32_t code = 0;
|
||||
|
||||
code = tsdbWriteDelIdx(pCommitter->pDelFWriter, &pCommitter->nDelIdx, &pCommitter->pBuf3);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = tsdbDelFWriterClose(pCommitter->pDelFWriter);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (pCommitter->pDelFReader) {
|
||||
code = tsdbDelFReaderClose(pCommitter->pDelFReader);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
|
|
|
@ -18,14 +18,6 @@
|
|||
static const char *tsdbFileSuffix[] = {".tombstone", ".cache", ".index", ".data", ".last", ".sma", ""};
|
||||
|
||||
// .tombstone
|
||||
struct SDelFile {
|
||||
TSKEY minKey;
|
||||
TSKEY maxKey;
|
||||
int64_t minVersion;
|
||||
int64_t maxVersion;
|
||||
int64_t size;
|
||||
int64_t offset;
|
||||
};
|
||||
|
||||
struct STsdbIndexFile {
|
||||
int64_t size;
|
||||
|
|
|
@ -105,6 +105,7 @@ int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SDelIdx *pDelIdx, uint8_t **ppBuf)
|
|||
|
||||
// SDelFReader ====================================================
|
||||
struct SDelFReader {
|
||||
STsdb *pTsdb;
|
||||
SDelFile *pFile;
|
||||
TdFilePtr pReadH;
|
||||
};
|
||||
|
@ -129,6 +130,38 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelData *pDelData, uint8_t **ppBu
|
|||
|
||||
int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf) {
|
||||
int32_t code = 0;
|
||||
// TODO
|
||||
int64_t offset = pReader->pFile->offset;
|
||||
int64_t size = pReader->pFile->size - offset;
|
||||
|
||||
// seek
|
||||
if (taosLSeekFile(pReader->pReadH, pReader->pFile->offset, SEEK_SET) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// read
|
||||
if (taosReadFile(pReader->pReadH, *ppBuf, size) < size) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// realloc buf
|
||||
code = tsdbRealloc(ppBuf, size);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// check
|
||||
if (!taosCheckChecksumWhole(*ppBuf, size)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// decode
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d failed to read del idx since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
|
@ -15,6 +15,41 @@
|
|||
|
||||
#include "tsdb.h"
|
||||
|
||||
int32_t tsdbRealloc(uint8_t **ppBuf, int64_t size) {
|
||||
int32_t code = 0;
|
||||
int64_t bsize = 0;
|
||||
uint8_t *pBuf;
|
||||
|
||||
if (*ppBuf) {
|
||||
bsize = *(int64_t *)((*ppBuf) - sizeof(int64_t));
|
||||
}
|
||||
|
||||
if (bsize >= size) goto _exit;
|
||||
|
||||
if (bsize == 0) bsize = 128;
|
||||
while (bsize < size) {
|
||||
bsize *= 2;
|
||||
}
|
||||
|
||||
pBuf = taosMemoryRealloc(*ppBuf ? (*ppBuf) - sizeof(int64_t) : *ppBuf, bsize + sizeof(int64_t));
|
||||
if (pBuf == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
*(int64_t *)pBuf = bsize;
|
||||
*ppBuf = pBuf + sizeof(int64_t);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
void tsdbFree(uint8_t *pBuf) {
|
||||
if (pBuf) {
|
||||
taosMemoryFree(pBuf - sizeof(int64_t));
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tTABLEIDCmprFn(const void *p1, const void *p2) {
|
||||
TABLEID *pId1 = (TABLEID *)p1;
|
||||
TABLEID *pId2 = (TABLEID *)p2;
|
||||
|
@ -53,24 +88,22 @@ int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tPutSDelIdx(uint8_t *p, SDelIdx *pDelIdx) {
|
||||
int32_t tPutTMap(uint8_t *p, STMap *pMap) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tPutU32(p ? p + n : p, pDelIdx->delimiter);
|
||||
n += tPutU8(p ? p + n : p, pDelIdx->flags);
|
||||
n += tPutBinary(p ? p + n : p, pDelIdx->pOffset, pDelIdx->nOffset);
|
||||
n += tPutBinary(p ? p + n : p, pDelIdx->pData, pDelIdx->nData);
|
||||
n += tPutU8(p ? p + n : p, pMap->flags);
|
||||
n += tPutBinary(p ? p + n : p, pMap->pOffset, pMap->nOffset);
|
||||
n += tPutBinary(p ? p + n : p, pMap->pData, pMap->nData);
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
int32_t tGetSDelIdx(uint8_t *p, SDelIdx *pDelIdx) {
|
||||
int32_t tGetTMap(uint8_t *p, STMap *pMap) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tGetU32(p + n, &pDelIdx->delimiter);
|
||||
n += tGetU8(p + n, &pDelIdx->flags);
|
||||
n += tGetBinary(p + n, &pDelIdx->pOffset, &pDelIdx->nOffset);
|
||||
n += tGetBinary(p + n, &pDelIdx->pData, &pDelIdx->nData);
|
||||
n += tGetU8(p, &pMap->flags);
|
||||
n += tGetBinary(p, &pMap->pOffset, &pMap->nOffset);
|
||||
n += tGetBinary(p, &pMap->pData, &pMap->nData);
|
||||
|
||||
return n;
|
||||
}
|
Loading…
Reference in New Issue