more work

This commit is contained in:
Hongze Cheng 2022-06-14 08:45:11 +00:00
parent 72e525082f
commit 9f398eca68
4 changed files with 308 additions and 293 deletions

View File

@ -35,10 +35,9 @@ typedef struct TSDBROW TSDBROW;
typedef struct TSDBKEY TSDBKEY; typedef struct TSDBKEY TSDBKEY;
typedef struct TABLEID TABLEID; typedef struct TABLEID TABLEID;
typedef struct SDelOp SDelOp; typedef struct SDelOp SDelOp;
typedef struct SDelDataItem SDelDataItem;
typedef struct SDelData SDelData; typedef struct SDelData SDelData;
typedef struct SDelIdxItem SDelIdxItem;
typedef struct SDelIdx SDelIdx; typedef struct SDelIdx SDelIdx;
typedef struct SDelDataInfo SDelDataInfo;
typedef struct STbData STbData; typedef struct STbData STbData;
typedef struct SMemTable SMemTable; typedef struct SMemTable SMemTable;
typedef struct STbDataIter STbDataIter; typedef struct STbDataIter STbDataIter;
@ -49,6 +48,15 @@ typedef struct SMapData SMapData;
typedef struct SColData SColData; typedef struct SColData SColData;
typedef struct SColDataBlock SColDataBlock; typedef struct SColDataBlock SColDataBlock;
typedef struct SBlockSMA SBlockSMA; typedef struct SBlockSMA SBlockSMA;
typedef struct SBlockIdxItem SBlockIdxItem;
typedef struct SBlockIdx SBlockIdx;
typedef struct SBlockInfo SBlockInfo;
typedef struct SBlock SBlock;
typedef struct SBlockCol SBlockCol;
typedef struct SBlockStatis SBlockStatis;
typedef struct SAggrBlkCol SAggrBlkCol;
typedef struct SBlockData SBlockData;
typedef struct SReadH SReadH;
// tsdbMemTable ============================================================================================== // tsdbMemTable ==============================================================================================
@ -110,8 +118,9 @@ typedef struct SDelFWriter SDelFWriter;
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb); int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
int32_t tsdbDelFWriterClose(SDelFWriter *pWriter, int8_t sync); int32_t tsdbDelFWriterClose(SDelFWriter *pWriter, int8_t sync);
int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf, int64_t *rOffset, int64_t *rSize); int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelDataInfo *pInfo, SMapData *pDelDataMap, uint8_t **ppBuf,
int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SDelIdx *pDelIdx, uint8_t **ppBuf); int64_t *rOffset, int64_t *rSize);
int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SMapData *pDelIdxMap, uint8_t **ppBuf);
int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter, uint8_t **ppBuf); int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter, uint8_t **ppBuf);
// SDelFReader // SDelFReader
@ -119,27 +128,8 @@ typedef struct SDelFReader SDelFReader;
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb, uint8_t **ppBuf); int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb, uint8_t **ppBuf);
int32_t tsdbDelFReaderClose(SDelFReader *pReader); int32_t tsdbDelFReaderClose(SDelFReader *pReader);
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdxItem *pItem, SDelData *pDelData, uint8_t **ppBuf); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SMapData *pDelDataMap, uint8_t **ppBuf);
int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf); int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppBuf);
// SCacheFWriter
typedef struct SCacheFWriter SCacheFWriter;
// SCacheFReader
typedef struct SCacheFReader SCacheFReader;
// tsdbCommit.c ==============================================================================================
// tsdbReadImpl.c ==============================================================================================
typedef struct SBlockIdxItem SBlockIdxItem;
typedef struct SBlockIdx SBlockIdx;
typedef struct SBlockInfo SBlockInfo;
typedef struct SBlock SBlock;
typedef struct SBlockCol SBlockCol;
typedef struct SBlockStatis SBlockStatis;
typedef struct SAggrBlkCol SAggrBlkCol;
typedef struct SBlockData SBlockData;
typedef struct SReadH SReadH;
// tsdbUtil.c ============================================================================================== // tsdbUtil.c ==============================================================================================
int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision); int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision);
@ -171,20 +161,20 @@ int32_t tGetBlockIdx(uint8_t *p, SBlockIdx *pBlockIdx);
int32_t tBlockCmprFn(const void *p1, const void *p2); int32_t tBlockCmprFn(const void *p1, const void *p2);
// SDelIdx // SDelIdx
int32_t tDelIdxClear(SDelIdx *pDelIdx); // int32_t tDelIdxClear(SDelIdx *pDelIdx);
int32_t tDelIdxPutItem(SDelIdx *pDelIdx, SDelIdxItem *pItem); // int32_t tDelIdxPutItem(SDelIdx *pDelIdx, SDelIdxItem *pItem);
int32_t tDelIdxGetItemByIdx(SDelIdx *pDelIdx, SDelIdxItem *pItem, int32_t idx); // int32_t tDelIdxGetItemByIdx(SDelIdx *pDelIdx, SDelIdxItem *pItem, int32_t idx);
int32_t tDelIdxGetItem(SDelIdx *pDelIdx, SDelIdxItem *pItem, TABLEID id); // int32_t tDelIdxGetItem(SDelIdx *pDelIdx, SDelIdxItem *pItem, TABLEID id);
int32_t tPutDelIdx(uint8_t *p, SDelIdx *pDelIdx); int32_t tPutDelIdx(uint8_t *p, void *ph);
int32_t tGetDelIdx(uint8_t *p, SDelIdx *pDelIdx); int32_t tGetDelIdx(uint8_t *p, void *ph);
// SDelData // SDelData
int32_t tDelDataClear(SDelData *pDelData); // int32_t tDelDataClear(SDelData *pDelData);
int32_t tDelDataPutItem(SDelData *pDelData, SDelDataItem *pItem); // int32_t tDelDataPutItem(SDelData *pDelData, SDelDataItem *pItem);
int32_t tDelDataGetItemByIdx(SDelData *pDelData, SDelDataItem *pItem, int32_t idx); // int32_t tDelDataGetItemByIdx(SDelData *pDelData, SDelDataItem *pItem, int32_t idx);
int32_t tDelDataGetItem(SDelData *pDelData, SDelDataItem *pItem, int64_t version); // int32_t tDelDataGetItem(SDelData *pDelData, SDelDataItem *pItem, int64_t version);
int32_t tPutDelData(uint8_t *p, SDelData *pDelData); int32_t tPutDelData(uint8_t *p, void *ph);
int32_t tGetDelData(uint8_t *p, SDelData *pDelData); int32_t tGetDelData(uint8_t *p, void *ph);
int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile); int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile);
int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile); int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile);
@ -243,6 +233,11 @@ typedef struct SMemSkipList {
SMemSkipListNode *pTail; SMemSkipListNode *pTail;
} SMemSkipList; } SMemSkipList;
struct SDelDataInfo {
tb_uid_t suid;
tb_uid_t uid;
};
struct STbData { struct STbData {
tb_uid_t suid; tb_uid_t suid;
tb_uid_t uid; tb_uid_t uid;
@ -376,22 +371,13 @@ struct SDelOp {
SDelOp *pNext; SDelOp *pNext;
}; };
struct SDelDataItem { struct SDelData {
int64_t version; int64_t version;
TSKEY sKey; TSKEY sKey;
TSKEY eKey; TSKEY eKey;
}; };
struct SDelData { struct SDelIdx {
uint32_t delimiter;
tb_uid_t suid;
tb_uid_t uid;
SOffset offset;
uint32_t nData;
uint8_t *pData;
};
struct SDelIdxItem {
tb_uid_t suid; tb_uid_t suid;
tb_uid_t uid; tb_uid_t uid;
TSKEY minKey; TSKEY minKey;
@ -402,13 +388,6 @@ struct SDelIdxItem {
int64_t size; int64_t size;
}; };
struct SDelIdx {
uint32_t delimiter;
SOffset offset;
uint32_t nData;
uint8_t *pData;
};
struct SDelFile { struct SDelFile {
TSKEY minKey; TSKEY minKey;
TSKEY maxKey; TSKEY maxKey;

View File

@ -28,14 +28,14 @@ typedef struct {
int32_t minRow; int32_t minRow;
int32_t maxRow; int32_t maxRow;
// commit file data // commit file data
TSKEY nextKey; TSKEY nextKey;
int32_t commitFid; int32_t commitFid;
TSKEY minKey; TSKEY minKey;
TSKEY maxKey; TSKEY maxKey;
SDFileSetReader *pReader; SDataFReader *pReader;
SMapData oBlockIdx; // SMapData<SBlockIdx>, read from reader SMapData oBlockIdx; // SMapData<SBlockIdx>, read from reader
SDFileSetWriter *pWriter; SDataFWriter *pWriter;
SMapData nBlockIdx; // SMapData<SBlockIdx>, build by committer SMapData nBlockIdx; // SMapData<SBlockIdx>, build by committer
// commit table data // commit table data
STbDataIter iter; STbDataIter iter;
STbDataIter *pIter; STbDataIter *pIter;
@ -46,15 +46,11 @@ typedef struct {
SColDataBlock nColDataBlock; SColDataBlock nColDataBlock;
/* commit del */ /* commit del */
SDelFReader *pDelFReader; SDelFReader *pDelFReader;
SMapData oDelIdxMap; // SMapData<SDelIdx>, old
SMapData oDelDataMap; // SMapData<SDelData>, old
SDelFWriter *pDelFWriter; SDelFWriter *pDelFWriter;
SDelIdx delIdxOld; SMapData nDelIdxMap; // SMapData<SDelIdx>, new
SDelIdx delIdxNew; SMapData nDelDataMap; // SMapData<SDelData>, new
STbData *pTbData;
SDelIdxItem *pDelIdxItem;
SDelData delDataOld;
SDelData delDataNew;
SDelIdxItem delIdxItem;
/* commit cache */
} SCommitter; } SCommitter;
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter); static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
@ -79,7 +75,7 @@ _err:
int32_t tsdbCommit(STsdb *pTsdb) { int32_t tsdbCommit(STsdb *pTsdb) {
if (!pTsdb) return 0; if (!pTsdb) return 0;
int32_t code = 0; int32_t code = 0;
SCommitter commith; SCommitter commith;
SMemTable *pMemTable = pTsdb->mem; SMemTable *pMemTable = pTsdb->mem;
@ -179,22 +175,23 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
SDelFile *pDelFileR = NULL; // TODO SDelFile *pDelFileR = NULL; // TODO
SDelFile *pDelFileW = NULL; // TODO SDelFile *pDelFileW = NULL; // TODO
tMapDataReset(&pCommitter->oDelIdxMap);
tMapDataReset(&pCommitter->nDelIdxMap);
// load old // load old
pCommitter->delIdxOld = (SDelIdx){0};
if (pDelFileR) { if (pDelFileR) {
code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL); code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
if (code) { if (code) {
goto _err; goto _err;
} }
code = tsdbReadDelIdx(pCommitter->pDelFReader, &pCommitter->delIdxOld, &pCommitter->pBuf1); code = tsdbReadDelIdx(pCommitter->pDelFReader, &pCommitter->oDelIdxMap, &pCommitter->pBuf1);
if (code) { if (code) {
goto _err; goto _err;
} }
} }
// prepare new // prepare new
pCommitter->delIdxNew = (SDelIdx){0};
code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW, pTsdb); code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW, pTsdb);
if (code) { if (code) {
goto _err; goto _err;

View File

@ -139,23 +139,29 @@ _err:
return code; return code;
} }
int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf, int64_t *rOffset, int64_t *rSize) { int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelDataInfo *pInfo, SMapData *pDelDataMap, uint8_t **ppBuf,
int64_t *rOffset, int64_t *rSize) {
int32_t code = 0; int32_t code = 0;
uint8_t *pBuf = NULL; uint8_t *pBuf = NULL;
int64_t size; int64_t size = 0;
int64_t n; int64_t n = 0;
// prepare // prepare
pDelData->delimiter = TSDB_FILE_DLMT; size += tPutU32(NULL, TSDB_FILE_DLMT);
size += tPutI64(NULL, pInfo->suid);
size += tPutI64(NULL, pInfo->uid);
size = size + tPutMapData(NULL, pDelDataMap) + sizeof(TSCKSUM);
// alloc // alloc
if (!ppBuf) ppBuf = &pBuf; if (!ppBuf) ppBuf = &pBuf;
size = tPutDelData(NULL, pDelData) + sizeof(TSCKSUM);
code = tsdbRealloc(ppBuf, size); code = tsdbRealloc(ppBuf, size);
if (code) goto _err; if (code) goto _err;
// build // build
n = tPutDelData(*ppBuf, pDelData); n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT);
n += tPutI64(*ppBuf + n, pInfo->suid);
n += tPutI64(*ppBuf + n, pInfo->uid);
n += tPutMapData(*ppBuf + n, pDelDataMap);
taosCalcChecksumAppend(0, *ppBuf, size); taosCalcChecksumAppend(0, *ppBuf, size);
ASSERT(n + sizeof(TSCKSUM) == size); ASSERT(n + sizeof(TSCKSUM) == size);
@ -184,23 +190,24 @@ _err:
return code; return code;
} }
int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SDelIdx *pDelIdx, uint8_t **ppBuf) { int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SMapData *pDelIdxMap, uint8_t **ppBuf) {
int32_t code = 0; int32_t code = 0;
int64_t size; int64_t size = 0;
int64_t n; int64_t n = 0;
uint8_t *pBuf = NULL; uint8_t *pBuf = NULL;
// prepare // prepare
pDelIdx->delimiter = TSDB_FILE_DLMT; size += tPutU32(NULL, TSDB_FILE_DLMT);
size = size + tPutMapData(NULL, pDelIdxMap) + sizeof(TSCKSUM);
// alloc // alloc
if (!ppBuf) ppBuf = &pBuf; if (!ppBuf) ppBuf = &pBuf;
size = tPutDelIdx(NULL, pDelIdx) + sizeof(TSCKSUM);
code = tsdbRealloc(ppBuf, size); code = tsdbRealloc(ppBuf, size);
if (code) goto _err; if (code) goto _err;
// build // build
n = tPutDelIdx(*ppBuf, pDelIdx); n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT);
n += tPutMapData(*ppBuf + n, pDelIdxMap);
taosCalcChecksumAppend(0, *ppBuf, size); taosCalcChecksumAppend(0, *ppBuf, size);
ASSERT(n + sizeof(TSCKSUM) == size); ASSERT(n + sizeof(TSCKSUM) == size);
@ -261,7 +268,7 @@ int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter, uint8_t **ppBuf) {
return code; return code;
_err: _err:
tsdbError("vgId:%d failed to update del file header since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d update del file hdr failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
tsdbFree(pBuf); tsdbFree(pBuf);
return code; return code;
} }
@ -320,6 +327,7 @@ _exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d del file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
*ppReader = NULL; *ppReader = NULL;
return code; return code;
} }
@ -339,39 +347,46 @@ _exit:
return code; return code;
} }
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdxItem *pItem, SDelData *pDelData, uint8_t **ppBuf) { int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SMapData *pDelDataMap, uint8_t **ppBuf) {
int32_t code = 0; int32_t code = 0;
int64_t n; int64_t n;
uint32_t delimiter;
SDelDataInfo info;
// seek // seek
if (taosLSeekFile(pReader->pReadH, pItem->offset, SEEK_SET) < 0) { if (taosLSeekFile(pReader->pReadH, pDelIdx->offset, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
// alloc // alloc
code = tsdbRealloc(ppBuf, pItem->size); if (!ppBuf) ppBuf = &pDelDataMap->pBuf;
code = tsdbRealloc(ppBuf, pDelIdx->size);
if (code) goto _err; if (code) goto _err;
// read // read
n = taosReadFile(pReader->pReadH, *ppBuf, pItem->size); n = taosReadFile(pReader->pReadH, *ppBuf, pDelIdx->size);
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
// check // check
if (!taosCheckChecksumWhole(*ppBuf, pItem->size)) { if (!taosCheckChecksumWhole(*ppBuf, pDelIdx->size)) {
code = TSDB_CODE_FILE_CORRUPTED; code = TSDB_CODE_FILE_CORRUPTED;
goto _err; goto _err;
} }
// decode // // decode
n = tGetDelData(*ppBuf, pDelData); n = 0;
ASSERT(n + sizeof(TSCKSUM) == pItem->size); n += tGetU32(*ppBuf + n, &delimiter);
ASSERT(pDelData->delimiter == TSDB_FILE_DLMT); ASSERT(delimiter == TSDB_FILE_DLMT);
ASSERT(pDelData->suid = pItem->suid); n += tGetI64(*ppBuf + n, &info.suid);
ASSERT(pDelData->uid = pItem->uid); ASSERT(info.suid == pDelIdx->suid);
n += tGetI64(*ppBuf + n, &info.uid);
ASSERT(info.uid == pDelIdx->uid);
n += tGetMapData(*ppBuf + n, pDelDataMap);
ASSERT(n + sizeof(TSCKSUM) == pDelIdx->size);
return code; return code;
@ -380,11 +395,12 @@ _err:
return code; return code;
} }
int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf) { int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppBuf) {
int32_t code = 0; int32_t code = 0;
int32_t n; int32_t n;
int64_t offset = pReader->pFile->offset; int64_t offset = pReader->pFile->offset;
int64_t size = pReader->pFile->size - offset; int64_t size = pReader->pFile->size - offset;
uint32_t delimiter;
ASSERT(ppBuf && *ppBuf); ASSERT(ppBuf && *ppBuf);
@ -395,10 +411,9 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf)
} }
// alloc // alloc
if (!ppBuf) ppBuf = &pDelIdxMap->pBuf;
code = tsdbRealloc(ppBuf, size); code = tsdbRealloc(ppBuf, size);
if (code) { if (code) goto _err;
goto _err;
}
// read // read
if (taosReadFile(pReader->pReadH, *ppBuf, size) < size) { if (taosReadFile(pReader->pReadH, *ppBuf, size) < size) {
@ -413,13 +428,15 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf)
} }
// decode // decode
n = tGetDelIdx(*ppBuf, pDelIdx); n = 0;
ASSERT(n == size - sizeof(TSCKSUM)); n += tGetU32(*ppBuf + n, &delimiter);
ASSERT(pDelIdx->delimiter == TSDB_FILE_DLMT); ASSERT(delimiter == TSDB_FILE_DLMT);
n += tGetMapData(*ppBuf + n, pDelIdxMap);
ASSERT(n + sizeof(TSCKSUM) == size);
return code; return code;
_err: _err:
tsdbError("vgId:%d failed to read del idx since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }

View File

@ -207,18 +207,30 @@ int32_t tPutMapData(uint8_t *p, SMapData *pMapData) {
n += tPutI32v(p ? p + n : p, pMapData->nItem); n += tPutI32v(p ? p + n : p, pMapData->nItem);
if (maxOffset <= INT8_MAX) { if (maxOffset <= INT8_MAX) {
n += tPutU8(p ? p + n : p, TSDB_OFFSET_I8); n += tPutU8(p ? p + n : p, TSDB_OFFSET_I8);
for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { if (p) {
n += tPutI8(p ? p + n : p, (int8_t)(((int32_t *)pMapData->pData)[iItem])); for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
n += tPutI8(p + n, (int8_t)(((int32_t *)pMapData->pData)[iItem]));
}
} else {
n = n + sizeof(int8_t) * pMapData->nItem;
} }
} else if (maxOffset <= INT16_MAX) { } else if (maxOffset <= INT16_MAX) {
n += tPutU8(p ? p + n : p, TSDB_OFFSET_I16); n += tPutU8(p ? p + n : p, TSDB_OFFSET_I16);
for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { if (p) {
n += tPutI8(p ? p + n : p, (int16_t)(((int32_t *)pMapData->pData)[iItem])); for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
n += tPutI16(p + n, (int16_t)(((int32_t *)pMapData->pData)[iItem]));
}
} else {
n = n + sizeof(int16_t) * pMapData->nItem;
} }
} else { } else {
n += tPutU8(p ? p + n : p, TSDB_OFFSET_I32); n += tPutU8(p ? p + n : p, TSDB_OFFSET_I32);
for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { if (p) {
n += tPutI8(p ? p + n : p, (int32_t)(((int32_t *)pMapData->pData)[iItem])); for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
n += tPutI32(p + n, (int32_t)(((int32_t *)pMapData->pData)[iItem]));
}
} else {
n = n + sizeof(int32_t) * pMapData->nItem;
} }
} }
n += tPutBinary(p ? p + n : p, pMapData->pData, pMapData->nData); n += tPutBinary(p ? p + n : p, pMapData->pData, pMapData->nData);
@ -344,36 +356,36 @@ static FORCE_INLINE int32_t tGetTSDBKEY(uint8_t *p, TSDBKEY *pKey) {
return n; return n;
} }
// SDelIdxItem ====================================================== // // SDelIdxItem ======================================================
static FORCE_INLINE int32_t tPutDelIdxItem(uint8_t *p, SDelIdxItem *pDelIdxItem) { // static FORCE_INLINE int32_t tPutDelIdxItem(uint8_t *p, SDelIdxItem *pDelIdxItem) {
int32_t n = 0; // int32_t n = 0;
n += tPutI64(p ? p + n : p, pDelIdxItem->suid); // n += tPutI64(p ? p + n : p, pDelIdxItem->suid);
n += tPutI64(p ? p + n : p, pDelIdxItem->uid); // n += tPutI64(p ? p + n : p, pDelIdxItem->uid);
n += tPutI64(p ? p + n : p, pDelIdxItem->minKey); // n += tPutI64(p ? p + n : p, pDelIdxItem->minKey);
n += tPutI64(p ? p + n : p, pDelIdxItem->maxKey); // n += tPutI64(p ? p + n : p, pDelIdxItem->maxKey);
n += tPutI64v(p ? p + n : p, pDelIdxItem->minVersion); // n += tPutI64v(p ? p + n : p, pDelIdxItem->minVersion);
n += tPutI64v(p ? p + n : p, pDelIdxItem->maxVersion); // n += tPutI64v(p ? p + n : p, pDelIdxItem->maxVersion);
n += tPutI64v(p ? p + n : p, pDelIdxItem->offset); // n += tPutI64v(p ? p + n : p, pDelIdxItem->offset);
n += tPutI64v(p ? p + n : p, pDelIdxItem->size); // n += tPutI64v(p ? p + n : p, pDelIdxItem->size);
return n; // return n;
} // }
static FORCE_INLINE int32_t tGetDelIdxItem(uint8_t *p, SDelIdxItem *pDelIdxItem) { // static FORCE_INLINE int32_t tGetDelIdxItem(uint8_t *p, SDelIdxItem *pDelIdxItem) {
int32_t n = 0; // int32_t n = 0;
n += tGetI64(p + n, &pDelIdxItem->suid); // n += tGetI64(p + n, &pDelIdxItem->suid);
n += tGetI64(p + n, &pDelIdxItem->uid); // n += tGetI64(p + n, &pDelIdxItem->uid);
n += tGetI64(p + n, &pDelIdxItem->minKey); // n += tGetI64(p + n, &pDelIdxItem->minKey);
n += tGetI64(p + n, &pDelIdxItem->maxKey); // n += tGetI64(p + n, &pDelIdxItem->maxKey);
n += tGetI64v(p + n, &pDelIdxItem->minVersion); // n += tGetI64v(p + n, &pDelIdxItem->minVersion);
n += tGetI64v(p + n, &pDelIdxItem->maxVersion); // n += tGetI64v(p + n, &pDelIdxItem->maxVersion);
n += tGetI64v(p + n, &pDelIdxItem->offset); // n += tGetI64v(p + n, &pDelIdxItem->offset);
n += tGetI64v(p + n, &pDelIdxItem->size); // n += tGetI64v(p + n, &pDelIdxItem->size);
return n; // return n;
} // }
// SBlockIdxItem ====================================================== // SBlockIdxItem ======================================================
static FORCE_INLINE int32_t tPutBlockIdxItem(uint8_t *p, SBlockIdxItem *pItem) { static FORCE_INLINE int32_t tPutBlockIdxItem(uint8_t *p, SBlockIdxItem *pItem) {
@ -501,209 +513,219 @@ int32_t tBlockCmprFn(const void *p1, const void *p2) {
} }
// SDelIdx ====================================================== // SDelIdx ======================================================
int32_t tDelIdxClear(SDelIdx *pDelIdx) { // int32_t tDelIdxClear(SDelIdx *pDelIdx) {
int32_t code = 0; // int32_t code = 0;
tdbFree(pDelIdx->offset.pOffset); // tdbFree(pDelIdx->offset.pOffset);
tdbFree(pDelIdx->pData); // tdbFree(pDelIdx->pData);
return code; // return code;
} // }
int32_t tDelIdxPutItem(SDelIdx *pDelIdx, SDelIdxItem *pItem) { // int32_t tDelIdxPutItem(SDelIdx *pDelIdx, SDelIdxItem *pItem) {
int32_t code = 0; // int32_t code = 0;
uint32_t offset = pDelIdx->nData; // uint32_t offset = pDelIdx->nData;
// offset // // offset
code = tsdbAddOffset(&pDelIdx->offset, offset); // code = tsdbAddOffset(&pDelIdx->offset, offset);
if (code) goto _exit; // if (code) goto _exit;
// alloc // // alloc
pDelIdx->nData += tPutDelIdxItem(NULL, pItem); // pDelIdx->nData += tPutDelIdxItem(NULL, pItem);
code = tsdbRealloc(&pDelIdx->pData, pDelIdx->nData); // code = tsdbRealloc(&pDelIdx->pData, pDelIdx->nData);
if (code) goto _exit; // if (code) goto _exit;
// put // // put
tPutDelIdxItem(pDelIdx->pData + offset, pItem); // tPutDelIdxItem(pDelIdx->pData + offset, pItem);
_exit: // _exit:
return code; // return code;
} // }
int32_t tDelIdxGetItemByIdx(SDelIdx *pDelIdx, SDelIdxItem *pItem, int32_t idx) { // int32_t tDelIdxGetItemByIdx(SDelIdx *pDelIdx, SDelIdxItem *pItem, int32_t idx) {
int32_t code = 0; // int32_t code = 0;
int32_t offset; // int32_t offset;
offset = tsdbGetOffset(&pDelIdx->offset, idx); // offset = tsdbGetOffset(&pDelIdx->offset, idx);
if (offset < 0) { // if (offset < 0) {
code = TSDB_CODE_NOT_FOUND; // code = TSDB_CODE_NOT_FOUND;
goto _exit; // goto _exit;
} // }
tGetDelIdxItem(pDelIdx->pData + offset, pItem); // tGetDelIdxItem(pDelIdx->pData + offset, pItem);
_exit: // _exit:
return code; // return code;
} // }
int32_t tDelIdxGetItem(SDelIdx *pDelIdx, SDelIdxItem *pItem, TABLEID id) { // int32_t tDelIdxGetItem(SDelIdx *pDelIdx, SDelIdxItem *pItem, TABLEID id) {
int32_t code = 0; // int32_t code = 0;
int32_t lidx = 0; // int32_t lidx = 0;
int32_t ridx = pDelIdx->offset.nOffset - 1; // int32_t ridx = pDelIdx->offset.nOffset - 1;
int32_t midx; // int32_t midx;
int32_t c; // int32_t c;
while (lidx <= ridx) { // while (lidx <= ridx) {
midx = (lidx + ridx) / 2; // midx = (lidx + ridx) / 2;
code = tDelIdxGetItemByIdx(pDelIdx, pItem, midx); // code = tDelIdxGetItemByIdx(pDelIdx, pItem, midx);
if (code) goto _exit; // if (code) goto _exit;
c = tTABLEIDCmprFn(&id, pItem); // c = tTABLEIDCmprFn(&id, pItem);
if (c == 0) { // if (c == 0) {
goto _exit; // goto _exit;
} else if (c < 0) { // } else if (c < 0) {
ridx = midx - 1; // ridx = midx - 1;
} else { // } else {
lidx = midx + 1; // lidx = midx + 1;
} // }
} // }
code = TSDB_CODE_NOT_FOUND; // code = TSDB_CODE_NOT_FOUND;
_exit: // _exit:
return code; // return code;
} // }
int32_t tPutDelIdx(uint8_t *p, SDelIdx *pDelIdx) { int32_t tPutDelIdx(uint8_t *p, void *ph) {
int32_t n = 0; SDelIdx *pDelIdx = (SDelIdx *)ph;
int32_t n = 0;
n += tPutU32(p ? p + n : p, pDelIdx->delimiter); n += tPutI64(p ? p + n : p, pDelIdx->suid);
n += tPutOffset(p ? p + n : p, &pDelIdx->offset); n += tPutI64(p ? p + n : p, pDelIdx->uid);
n += tPutBinary(p ? p + n : p, pDelIdx->pData, pDelIdx->nData); n += tPutI64(p ? p + n : p, pDelIdx->minKey);
n += tPutI64(p ? p + n : p, pDelIdx->maxKey);
n += tPutI64v(p ? p + n : p, pDelIdx->minVersion);
n += tPutI64v(p ? p + n : p, pDelIdx->maxVersion);
n += tPutI64v(p ? p + n : p, pDelIdx->offset);
n += tPutI64v(p ? p + n : p, pDelIdx->size);
return n; return n;
} }
int32_t tGetDelIdx(uint8_t *p, SDelIdx *pDelIdx) { int32_t tGetDelIdx(uint8_t *p, void *ph) {
int32_t n = 0; SDelIdx *pDelIdx = (SDelIdx *)ph;
int32_t n = 0;
n += tGetU32(p + n, &pDelIdx->delimiter); n += tGetI64(p + n, &pDelIdx->suid);
n += tGetOffset(p + n, &pDelIdx->offset); n += tGetI64(p + n, &pDelIdx->uid);
n += tGetBinary(p + n, &pDelIdx->pData, &pDelIdx->nData); n += tGetI64(p + n, &pDelIdx->minKey);
n += tGetI64(p + n, &pDelIdx->maxKey);
n += tGetI64v(p + n, &pDelIdx->minVersion);
n += tGetI64v(p + n, &pDelIdx->maxVersion);
n += tGetI64v(p + n, &pDelIdx->offset);
n += tGetI64v(p + n, &pDelIdx->size);
return n; return n;
} }
// SDelDataItem ====================================================== // // SDelDataItem ======================================================
static FORCE_INLINE int32_t tPutDelDataItem(uint8_t *p, SDelDataItem *pItem) { // static FORCE_INLINE int32_t tPutDelDataItem(uint8_t *p, SDelDataItem *pItem) {
int32_t n = 0; // int32_t n = 0;
n += tPutI64v(p ? p + n : p, pItem->version); // n += tPutI64v(p ? p + n : p, pItem->version);
n += tPutI64(p ? p + n : p, pItem->sKey); // n += tPutI64(p ? p + n : p, pItem->sKey);
n += tPutI64(p ? p + n : p, pItem->eKey); // n += tPutI64(p ? p + n : p, pItem->eKey);
return n; // return n;
} // }
static FORCE_INLINE int32_t tGetDelDataItem(uint8_t *p, SDelDataItem *pItem) { // static FORCE_INLINE int32_t tGetDelDataItem(uint8_t *p, SDelDataItem *pItem) {
int32_t n = 0; // int32_t n = 0;
n += tGetI64v(p + n, &pItem->version); // n += tGetI64v(p + n, &pItem->version);
n += tGetI64(p + n, &pItem->sKey); // n += tGetI64(p + n, &pItem->sKey);
n += tGetI64(p + n, &pItem->eKey); // n += tGetI64(p + n, &pItem->eKey);
return n; // return n;
} // }
// SDelData ====================================================== // SDelData ======================================================
int32_t tDelDataClear(SDelData *pDelData) { // int32_t tDelDataClear(SDelData *pDelData) {
int32_t code = 0; // int32_t code = 0;
tsdbFree(pDelData->offset.pOffset); // tsdbFree(pDelData->offset.pOffset);
tsdbFree(pDelData->pData); // tsdbFree(pDelData->pData);
return code; // return code;
} // }
int32_t tDelDataPutItem(SDelData *pDelData, SDelDataItem *pItem) { // int32_t tDelDataPutItem(SDelData *pDelData, SDelDataItem *pItem) {
int32_t code = 0; // int32_t code = 0;
uint32_t offset = pDelData->nData; // uint32_t offset = pDelData->nData;
// offset // // offset
code = tsdbAddOffset(&pDelData->offset, offset); // code = tsdbAddOffset(&pDelData->offset, offset);
if (code) goto _exit; // if (code) goto _exit;
// alloc // // alloc
pDelData->nData += tPutDelDataItem(NULL, pItem); // pDelData->nData += tPutDelDataItem(NULL, pItem);
code = tsdbRealloc(&pDelData->pData, pDelData->nData); // code = tsdbRealloc(&pDelData->pData, pDelData->nData);
if (code) goto _exit; // if (code) goto _exit;
// put // // put
tPutDelDataItem(pDelData->pData + offset, pItem); // tPutDelDataItem(pDelData->pData + offset, pItem);
_exit: // _exit:
return code; // return code;
} // }
int32_t tDelDataGetItemByIdx(SDelData *pDelData, SDelDataItem *pItem, int32_t idx) { // int32_t tDelDataGetItemByIdx(SDelData *pDelData, SDelDataItem *pItem, int32_t idx) {
int32_t code = 0; // int32_t code = 0;
int32_t offset; // int32_t offset;
offset = tsdbGetOffset(&pDelData->offset, idx); // offset = tsdbGetOffset(&pDelData->offset, idx);
if (offset < 0) { // if (offset < 0) {
code = TSDB_CODE_NOT_FOUND; // code = TSDB_CODE_NOT_FOUND;
goto _exit; // goto _exit;
} // }
tGetDelDataItem(pDelData->pData + offset, pItem); // tGetDelDataItem(pDelData->pData + offset, pItem);
_exit: // _exit:
return code; // return code;
} // }
int32_t tDelDataGetItem(SDelData *pDelData, SDelDataItem *pItem, int64_t version) { // int32_t tDelDataGetItem(SDelData *pDelData, SDelDataItem *pItem, int64_t version) {
int32_t code = 0; // int32_t code = 0;
int32_t lidx = 0; // int32_t lidx = 0;
int32_t ridx = pDelData->offset.nOffset - 1; // int32_t ridx = pDelData->offset.nOffset - 1;
int32_t midx; // int32_t midx;
while (lidx <= ridx) { // while (lidx <= ridx) {
midx = (lidx + ridx) / 2; // midx = (lidx + ridx) / 2;
code = tDelDataGetItemByIdx(pDelData, pItem, midx); // code = tDelDataGetItemByIdx(pDelData, pItem, midx);
if (code) goto _exit; // if (code) goto _exit;
if (version == pItem->version) { // if (version == pItem->version) {
goto _exit; // goto _exit;
} else if (version < pItem->version) { // } else if (version < pItem->version) {
ridx = midx - 1; // ridx = midx - 1;
} else { // } else {
ridx = midx + 1; // ridx = midx + 1;
} // }
} // }
code = TSDB_CODE_NOT_FOUND; // code = TSDB_CODE_NOT_FOUND;
_exit: // _exit:
return code; // return code;
} // }
int32_t tPutDelData(uint8_t *p, SDelData *pDelData) { int32_t tPutDelData(uint8_t *p, void *ph) {
int32_t n = 0; SDelData *pDelData = (SDelData *)ph;
int32_t n = 0;
n += tPutU32(p ? p + n : p, pDelData->delimiter); n += tPutI64v(p ? p + n : p, pDelData->version);
n += tPutI64(p ? p + n : p, pDelData->suid); n += tPutI64(p ? p + n : p, pDelData->sKey);
n += tPutI64(p ? p + n : p, pDelData->uid); n += tPutI64(p ? p + n : p, pDelData->eKey);
n += tPutOffset(p ? p + n : p, &pDelData->offset);
n += tPutBinary(p ? p + n : p, pDelData->pData, pDelData->nData);
return n; return n;
} }
int32_t tGetDelData(uint8_t *p, SDelData *pDelData) { int32_t tGetDelData(uint8_t *p, void *ph) {
int32_t n = 0; SDelData *pDelData = (SDelData *)ph;
int32_t n = 0;
n += tGetU32(p + n, &pDelData->delimiter); n += tGetI64v(p + n, &pDelData->version);
n += tGetI64(p + n, &pDelData->suid); n += tGetI64(p + n, &pDelData->sKey);
n += tGetI64(p + n, &pDelData->uid); n += tGetI64(p + n, &pDelData->eKey);
n += tGetOffset(p + n, &pDelData->offset);
n += tGetBinary(p + n, &pDelData->pData, &pDelData->nData);
return n; return n;
} }