From 6af7d4cfad373ad99f959453ac33e9c15d0f0776 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 11 Jun 2022 09:20:29 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 8 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 109 +++++++++++++++--- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 43 ++++++- source/dnode/vnode/src/tsdb/tsdbUtil.c | 25 ++++ 4 files changed, 163 insertions(+), 22 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 7f554a1a5f..b83638d0f4 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -90,7 +90,7 @@ typedef struct SDelFWriter SDelFWriter; int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb); int32_t tsdbDelFWriterClose(SDelFWriter *pWriter, int8_t sync); -int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf, SDelIdxItem *pItem); +int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf); int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SDelIdx *pDelIdx, uint8_t **ppBuf); int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter, uint8_t **ppBuf); @@ -99,7 +99,7 @@ typedef struct SDelFReader SDelFReader; int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb, uint8_t **ppBuf); int32_t tsdbDelFReaderClose(SDelFReader *pReader); -int32_t tsdbReadDelData(SDelFReader *pReader, SDelData *pDelData, uint8_t **ppBuf); +int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdxItem *pItem, SDelData *pDelData, uint8_t **ppBuf); int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf); // SCacheFWriter @@ -151,10 +151,14 @@ int32_t tsdbKeyCmprFn(const void *p1, const void *p2); // SDelIdx int32_t tDelIdxGetSize(SDelIdx *pDelIdx); int32_t tDelIdxGetItem(SDelIdx *pDelIdx, int32_t idx, SDelIdxItem *pItem); +int32_t tDelIdxPutItem(SDelIdx *pDelIdx, SDelIdxItem *pItem); int32_t tPutDelIdx(uint8_t *p, SDelIdx *pDelIdx); int32_t tGetDelIdx(uint8_t *p, SDelIdx *pDelIdx); // SDelData +int32_t tDelDataGetSize(SDelData *pDelData); +int32_t tDelDataGetItem(SDelData *pDelData, int32_t idx, SDelDataItem *pItem); +int32_t tDelDataPutItem(SDelData *pDelData, SDelDataItem *pItem); int32_t tPutDelData(uint8_t *p, SDelData *pDelData); int32_t tGetDelData(uint8_t *p, SDelData *pDelData); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 81f02d7ed0..ebc6b5d65a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -46,6 +46,7 @@ struct SCommitter { SDelIdx nDelIdx; SDelData oDelData; SDelData nDelData; + SDelIdxItem delIdxItem; /* commit cache */ }; @@ -203,6 +204,8 @@ _err: return code; } +static int32_t tsdbCommitTableDel(SCommitter *pCommitter); + static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; @@ -215,32 +218,28 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) { SDelIdx *pDelIdx = NULL; SDelIdx delIdx; - if (iTbData < nTbData) { - pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - } - if (iDelIdx < nDelIdx) { - // tIMapGet(); - pDelIdx = &delIdx; - } + // if (iTbData < nTbData) { + // pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + // } + // if (iDelIdx < nDelIdx) { + // // tIMapGet(); + // pDelIdx = &delIdx; + // } while (iTbData < nTbData || iDelIdx < nDelIdx) { if (pTbData && pDelIdx) { } else { } - // start - // 1. load table del if exist - - // impl - while (1) { - // do merge - } - - // end - // tsdbWriteDelData + code = tsdbCommitTableDel(pCommitter); + if (code) goto _err; } return code; + +_err: + tsdbError("vgId:%d commit del impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; } static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { @@ -540,6 +539,82 @@ static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter) { return code; } +static int32_t tsdbCommitTableDelStart(SCommitter *pCommitter) { + int32_t code = 0; + + // load old + pCommitter->oDelData = (SDelData){0}; + if (0) { + code = tsdbReadDelData(pCommitter->pDelFReader, NULL /*TODO*/, &pCommitter->oDelData, &pCommitter->pBuf4); + if (code) goto _err; + } + + // prepare new + pCommitter->nDelData = (SDelData){0}; + + return code; + +_err: + tsdbError("vgId:%d commit table del start failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + return code; +} + +static int32_t tsdbCommitTableDelImpl(SCommitter *pCommitter) { + int32_t code = 0; + SDelOp *pDelOp = NULL; + + for (; pDelOp; pDelOp = pDelOp->pNext) { + SDelDataItem item = {.version = pDelOp->version, .sKey = pDelOp->sKey, .eKey = pDelOp->eKey}; + + code = tDelDataPutItem(&pCommitter->nDelData, &item); + if (code) goto _err; + } + + return code; + +_err: + return code; +} + +static int32_t tsdbCommitTableDelEnd(SCommitter *pCommitter) { + int32_t code = 0; + + // write table del data + code = tsdbWriteDelData(pCommitter->pDelFWriter, &pCommitter->nDelData, NULL); + if (code) goto _err; + + // add SDelIdxItem + code = tDelIdxPutItem(&pCommitter->nDelIdx, &pCommitter->delIdxItem); + if (code) goto _err; + + return code; + +_err: + tsdbError("vgId:%d commit table del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + return code; +} + +static int32_t tsdbCommitTableDel(SCommitter *pCommitter) { + int32_t code = 0; + + // start + code = tsdbCommitTableDelStart(pCommitter); + if (code) goto _err; + + // impl + code = tsdbCommitTableDelImpl(pCommitter); + if (code) goto _err; + + // end + code = tsdbCommitTableDelEnd(pCommitter); + if (code) goto _err; + + return code; + +_err: + return code; +} + // // ===================================== OLD ====================================== #if 0 struct SCommitIter { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index d222fc5b3d..49406961aa 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -136,13 +136,15 @@ _err: return code; } -int32_t tsdbWriteDelDta(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf) { +int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf) { int32_t code = 0; uint8_t *pBuf = NULL; int64_t size; int64_t n; // prepare + pDelData->delimiter = TSDB_FILE_DLMT; + // todo // alloc if (!ppBuf) ppBuf = &pBuf; @@ -334,9 +336,44 @@ _exit: return code; } -int32_t tsdbReadDelData(SDelFReader *pReader, SDelData *pDelData, uint8_t **ppBuf) { +int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdxItem *pItem, SDelData *pDelData, uint8_t **ppBuf) { int32_t code = 0; - // TODO + int64_t n; + + // seek + if (taosLSeekFile(pReader->pReadH, pItem->offset, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // alloc + code = tsdbRealloc(ppBuf, pItem->size); + if (code) goto _err; + + // read + n = taosReadFile(pReader->pReadH, *ppBuf, pItem->size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // check + if (!taosCheckChecksumWhole(*ppBuf, pItem->size)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // decode + n = tGetDelData(*ppBuf, pDelData); + ASSERT(n + sizeof(TSCKSUM) == pItem->size); + ASSERT(pDelData->delimiter == TSDB_FILE_DLMT); + ASSERT(pDelData->suid = pItem->suid); + ASSERT(pDelData->uid = pItem->uid); + + return code; + +_err: + tsdbError("vgId:%d read del data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 3fb484e52f..c1093583dd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -101,6 +101,12 @@ int32_t tDelIdxGetItem(SDelIdx *pDelIdx, int32_t idx, SDelIdxItem *pItem) { return code; } +int32_t tDelIdxPutItem(SDelIdx *pDelIdx, SDelIdxItem *pItem) { + int32_t code = 0; + // TODO + return code; +} + int32_t tPutDelIdx(uint8_t *p, SDelIdx *pDelIdx) { int32_t n = 0; @@ -123,6 +129,25 @@ int32_t tGetDelIdx(uint8_t *p, SDelIdx *pDelIdx) { return n; } +// SDelData ====================================================== +int32_t tDelDataGetSize(SDelData *pDelData) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tDelDataGetItem(SDelData *pDelData, int32_t idx, SDelDataItem *pItem) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tDelDataPutItem(SDelData *pDelData, SDelDataItem *pItem) { + int32_t code = 0; + // TODO + return code; +} + int32_t tPutDelData(uint8_t *p, SDelData *pDelData) { int32_t n = 0;