diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index d5e715068f..1a95d90a21 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -90,9 +90,10 @@ typedef struct SDataFReader SDataFReader; typedef struct SDelFWriter SDelFWriter; int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb); -int32_t tsdbDelFWriterClose(SDelFWriter *pWriter); +int32_t tsdbDelFWriterClose(SDelFWriter *pWriter, int8_t sync); int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppBuf, SDelIdxItem *pItem); int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SDelIdx *pDelIdx, uint8_t **ppBuf); +int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter, uint8_t **ppBuf); // SDelFReader typedef struct SDelFReader SDelFReader; @@ -154,6 +155,8 @@ int32_t tGetDelIdx(uint8_t *p, SDelIdx *pDelIdx); int32_t tPutDelData(uint8_t *p, SDelData *pDelData); int32_t tGetDelData(uint8_t *p, SDelData *pDelData); +int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile); +int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile); // structs typedef struct { int minFid; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 7705af3e2f..4245eaf5ea 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -22,6 +22,8 @@ struct SCommitter { uint8_t *pBuf1; uint8_t *pBuf2; uint8_t *pBuf3; + uint8_t *pBuf4; + uint8_t *pBuf5; /* commit data */ int32_t minutes; int8_t precision; @@ -171,6 +173,8 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { SDelFile *pDelFileR = NULL; // TODO SDelFile *pDelFileW = NULL; // TODO + // load old + pCommitter->oDelIdx = (SDelIdx){0}; if (pDelFileR) { code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL); if (code) { @@ -183,6 +187,8 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { } } + // prepare new + pCommitter->nDelIdx = (SDelIdx){0}; code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW, pTsdb); if (code) { goto _err; @@ -250,12 +256,17 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) { static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { int32_t code = 0; - code = tsdbWriteDelIdx(pCommitter->pDelFWriter, &pCommitter->nDelIdx, &pCommitter->pBuf3); + code = tsdbWriteDelIdx(pCommitter->pDelFWriter, &pCommitter->nDelIdx, NULL); if (code) { goto _err; } - code = tsdbDelFWriterClose(pCommitter->pDelFWriter); + code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter, NULL); + if (code) { + goto _err; + } + + code = tsdbDelFWriterClose(pCommitter->pDelFWriter, 1); if (code) { goto _err; } @@ -268,6 +279,7 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { return code; _err: + tsdbError("vgId:%d commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 7a44fd799f..e14d09e6c1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -114,9 +114,25 @@ _err: return code; } -int32_t tsdbDelFWriterClose(SDelFWriter *pWriter) { +int32_t tsdbDelFWriterClose(SDelFWriter *pWriter, int8_t sync) { int32_t code = 0; - // TODO + + // sync + if (sync && taosFsyncFile(pWriter->pWriteH) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // close + if (taosCloseFile(&pWriter->pWriteH) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + return code; + +_err: + tsdbError("vgId:%d failed to close del file writer since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } @@ -129,35 +145,87 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppB } int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SDelIdx *pDelIdx, uint8_t **ppBuf) { - int32_t code = 0; - int64_t size; + int32_t code = 0; + int64_t size; + int64_t n; + uint8_t *pBuf = NULL; - size = tPutDelIdx(NULL, pDelIdx) + sizeof(TSCKSUM); + // prepare + pDelIdx->delimiter = TSDB_FILE_DLMT; + // pDelIdx->nOffset = (todo) // alloc + if (!ppBuf) ppBuf = &pBuf; + size = tPutDelIdx(NULL, pDelIdx) + sizeof(TSCKSUM); code = tsdbRealloc(ppBuf, size); if (code) { goto _err; } - // encode - tPutDelIdx(*ppBuf, pDelIdx); - - // checksum + // build + n = tPutDelIdx(*ppBuf, pDelIdx); taosCalcChecksumAppend(0, *ppBuf, size); + ASSERT(n + sizeof(TSCKSUM) == size); + + // write + n = taosWriteFile(pWriter->pWriteH, *ppBuf, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + ASSERT(n == size); + + // update + pWriter->pFile->offset = pWriter->pFile->size; + pWriter->pFile->size += size; + + tsdbFree(pBuf); + return code; + +_err: + tsdbError("vgId:%d failed to write del idx since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + tsdbFree(pBuf); + return code; +} + +int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter, uint8_t **ppBuf) { + int32_t code = 0; + uint8_t *pBuf = NULL; + int64_t size = TSDB_FHDR_SIZE; + int64_t n; + + // alloc + if (!ppBuf) ppBuf = &pBuf; + code = tsdbRealloc(ppBuf, size); + if (code) goto _err; + + // build + memset(*ppBuf, 0, size); + n = tPutDelFileHdr(*ppBuf, pWriter->pFile); + taosCalcChecksumAppend(0, *ppBuf, size); + + ASSERT(n <= size - sizeof(TSCKSUM)); + + // seek + if (taosLSeekFile(pWriter->pWriteH, 0, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + // write if (taosWriteFile(pWriter->pWriteH, *ppBuf, size) < size) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - pWriter->pFile->offset = pWriter->pFile->size; - pWriter->pFile->size += size; - + tsdbFree(pBuf); return code; _err: + tsdbError("vgId:%d failed to update del file header since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + tsdbFree(pBuf); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 0fa4e17b6e..df6d13ff7f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -130,4 +130,30 @@ int32_t tGetDelData(uint8_t *p, SDelData *pDelData) { n += tGetBinary(p, &pDelData->pData, &pDelData->nData); return n; -} \ No newline at end of file +} + +int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile) { + int32_t n = 0; + + n += tPutI64(p ? p + n : p, pDelFile->minKey); + n += tPutI64(p ? p + n : p, pDelFile->maxKey); + n += tPutI64v(p ? p + n : p, pDelFile->minVersion); + n += tPutI64v(p ? p + n : p, pDelFile->maxVersion); + n += tPutI64v(p ? p + n : p, pDelFile->size); + n += tPutI64v(p ? p + n : p, pDelFile->offset); + + return n; +} + +int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile) { + int32_t n = 0; + + n += tGetI64(p, &pDelFile->minKey); + n += tGetI64(p, &pDelFile->maxKey); + n += tGetI64v(p, &pDelFile->minVersion); + n += tGetI64v(p, &pDelFile->maxVersion); + n += tGetI64v(p, &pDelFile->size); + n += tGetI64v(p, &pDelFile->offset); + + return n; +}