diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 628a2461f8..0e44a636c6 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -236,14 +236,14 @@ int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA); // SDelFWriter int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb); int32_t tsdbDelFWriterClose(SDelFWriter *pWriter, int8_t sync); -int32_t tsdbWriteDelData(SDelFWriter *pWriter, SMapData *pDelDataMap, uint8_t **ppBuf, SDelIdx *pDelIdx); -int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SMapData *pDelIdxMap, uint8_t **ppBuf); -int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter, uint8_t **ppBuf); +int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, uint8_t **ppBuf, SDelIdx *pDelIdx); +int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx, uint8_t **ppBuf); +int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter); // SDelFReader int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb, uint8_t **ppBuf); int32_t tsdbDelFReaderClose(SDelFReader *pReader); -int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SMapData *pDelDataMap, uint8_t **ppBuf); -int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppBuf); +int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, uint8_t **ppBuf); +int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf); // tsdbCache int32_t tsdbOpenCache(STsdb *pTsdb); @@ -470,10 +470,6 @@ struct SDelIdx { struct SDelFile { int64_t commitID; - TSKEY minKey; - TSKEY maxKey; - int64_t minVersion; - int64_t maxVersion; int64_t size; int64_t offset; }; @@ -557,6 +553,12 @@ struct STsdbFS { STsdbFSState *nState; }; +struct SDelFWriter { + STsdb *pTsdb; + SDelFile fDel; + TdFilePtr pWriteH; +}; + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 07c821550d..5fc36cf9be 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -45,13 +45,10 @@ typedef struct { STSchema *pTSchema; /* commit del */ SDelFReader *pDelFReader; - SMapData oDelIdxMap; // SMapData, old - SMapData oDelDataMap; // SMapData, old SDelFWriter *pDelFWriter; - SMapData nDelIdxMap; // SMapData, new - SMapData nDelDataMap; // SMapData, new - SArray *aDelIdx; - SArray *aDelData; + SArray *aDelIdx; // SArray + SArray *aDelIdxN; // SArray + SArray *aDelData; // SArray } SCommitter; static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter); @@ -121,23 +118,37 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; - SDelFile *pDelFileR = NULL; // TODO - SDelFile *pDelFileW = NULL; // TODO - tMapDataReset(&pCommitter->oDelIdxMap); - tMapDataReset(&pCommitter->nDelIdxMap); + pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx)); + if (pCommitter->aDelIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } - // load old + pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData)); + if (pCommitter->aDelData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx)); + if (pCommitter->aDelIdxN == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + SDelFile *pDelFileR = pTsdb->fs->nState->pDelFile; if (pDelFileR) { code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL); if (code) goto _err; - code = tsdbReadDelIdx(pCommitter->pDelFReader, &pCommitter->oDelIdxMap, NULL); + code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx, NULL); if (code) goto _err; } // prepare new - code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW, pTsdb); + SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0}; + code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb); if (code) goto _err; _exit: @@ -151,60 +162,51 @@ _err: static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) { int32_t code = 0; - SDelData *pDelData = &(SDelData){}; + SDelData *pDelData; tb_uid_t suid; tb_uid_t uid; - SDelIdx delIdx; // TODO - // check no del data, just return - if (pTbData && pTbData->pHead == NULL) { - pTbData = NULL; - } - if (pTbData == NULL && pDelIdx == NULL) goto _exit; + taosArrayClear(pCommitter->aDelData); - // prepare if (pTbData) { - delIdx.suid = pTbData->suid; - delIdx.uid = pTbData->uid; - } else { - delIdx.suid = pDelIdx->suid; - delIdx.uid = pDelIdx->uid; - } + suid = pTbData->suid; + uid = pTbData->uid; - // start - tMapDataReset(&pCommitter->oDelDataMap); - tMapDataReset(&pCommitter->nDelDataMap); + if (pTbData->pHead == NULL) { + pTbData = NULL; + } + } if (pDelIdx) { - code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, &pCommitter->oDelDataMap, NULL); + suid = pDelIdx->suid; + uid = pDelIdx->uid; + + code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, NULL); if (code) goto _err; } - // disk - for (int32_t iDelData = 0; iDelData < pCommitter->oDelDataMap.nItem; iDelData++) { - code = tMapDataGetItemByIdx(&pCommitter->oDelDataMap, iDelData, pDelData, tGetDelData); - if (code) goto _err; + if (pTbData == NULL && pDelIdx == NULL) goto _exit; - code = tMapDataPutItem(&pCommitter->nDelDataMap, pDelData, tPutDelData); - if (code) goto _err; - } + SDelIdx delIdx = {.suid = suid, .uid = uid}; // memory pDelData = pTbData ? pTbData->pHead : NULL; for (; pDelData; pDelData = pDelData->pNext) { - code = tMapDataPutItem(&pCommitter->nDelDataMap, pDelData, tPutDelData); - if (code) goto _err; + if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } } - ASSERT(pCommitter->nDelDataMap.nItem > 0); - // write - code = tsdbWriteDelData(pCommitter->pDelFWriter, &pCommitter->nDelDataMap, NULL, &delIdx); + code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, NULL, &delIdx); if (code) goto _err; // put delIdx - code = tMapDataPutItem(&pCommitter->nDelIdxMap, &delIdx, tPutDelIdx); - if (code) goto _err; + if (taosArrayPush(pCommitter->aDelIdx, &delIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } _exit: return code; @@ -219,30 +221,23 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) { STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; int32_t iDelIdx = 0; - int32_t nDelIdx = pCommitter->oDelIdxMap.nItem; + int32_t nDelIdx = taosArrayGetSize(pCommitter->aDelIdx); int32_t iTbData = 0; int32_t nTbData = taosArrayGetSize(pMemTable->aTbData); STbData *pTbData; SDelIdx *pDelIdx; - SDelIdx delIdx; - int32_t c; ASSERT(nTbData > 0); pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - if (iDelIdx < nDelIdx) { - code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx); - if (code) goto _err; - pDelIdx = &delIdx; - } else { - pDelIdx = NULL; - } + pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL; while (true) { if (pTbData == NULL && pDelIdx == NULL) break; if (pTbData && pDelIdx) { - c = tTABLEIDCmprFn(pTbData, pDelIdx); + int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx); + if (c == 0) { goto _commit_mem_and_disk_del; } else if (c < 0) { @@ -258,44 +253,27 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) { _commit_mem_del: code = tsdbCommitTableDel(pCommitter, pTbData, NULL); if (code) goto _err; + iTbData++; - if (iTbData < nTbData) { - pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - } else { - pTbData = NULL; - } + pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL; continue; _commit_disk_del: code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx); if (code) goto _err; + iDelIdx++; - if (iDelIdx < nDelIdx) { - code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx); - if (code) goto _err; - pDelIdx = &delIdx; - } else { - pDelIdx = NULL; - } + pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL; continue; _commit_mem_and_disk_del: code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx); if (code) goto _err; + iTbData++; + pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL; iDelIdx++; - if (iTbData < nTbData) { - pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - } else { - pTbData = NULL; - } - if (iDelIdx < nDelIdx) { - code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx); - if (code) goto _err; - pDelIdx = &delIdx; - } else { - pDelIdx = NULL; - } + pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL; continue; } @@ -308,11 +286,15 @@ _err: static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { int32_t code = 0; + STsdb *pTsdb = pCommitter->pTsdb; - code = tsdbWriteDelIdx(pCommitter->pDelFWriter, &pCommitter->nDelIdxMap, NULL); + code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN, NULL); if (code) goto _err; - code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter, NULL); + code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter); + if (code) goto _err; + + code = tsdbFSStateUpsertDelFile(pTsdb->fs->nState, &pCommitter->pDelFWriter->fDel); if (code) goto _err; code = tsdbDelFWriterClose(pCommitter->pDelFWriter, 1); @@ -323,6 +305,10 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { if (code) goto _err; } + taosArrayDestroy(pCommitter->aDelIdx); + taosArrayDestroy(pCommitter->aDelData); + taosArrayDestroy(pCommitter->aDelIdxN); + return code; _err: diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 5a4dd83072..e7f8cb4789 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -283,10 +283,7 @@ void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) { int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile) { int32_t n = 0; - n += tPutI64(p ? p + n : p, pDelFile->minKey); - n += tPutI64(p ? p + n : p, pDelFile->maxKey); - n += tPutI64v(p ? p + n : p, pDelFile->minVersion); - n += tPutI64v(p ? p + n : p, pDelFile->maxVersion); + n += tPutI64v(p ? p + n : p, pDelFile->commitID); n += tPutI64v(p ? p + n : p, pDelFile->size); n += tPutI64v(p ? p + n : p, pDelFile->offset); @@ -296,10 +293,7 @@ int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile) { int32_t tGetDelFile(uint8_t *p, SDelFile *pDelFile) { int32_t n = 0; - n += tGetI64(p + n, &pDelFile->minKey); - n += tGetI64(p + n, &pDelFile->maxKey); - n += tGetI64v(p + n, &pDelFile->minVersion); - n += tGetI64v(p + n, &pDelFile->maxVersion); + n += tGetI64v(p + n, &pDelFile->commitID); n += tGetI64v(p + n, &pDelFile->size); n += tGetI64v(p + n, &pDelFile->offset); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 3a3d235b4e..07e4e0b8ba 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -18,43 +18,45 @@ #define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) // SDelFWriter ==================================================== -struct SDelFWriter { - STsdb *pTsdb; - SDelFile *pFile; - TdFilePtr pWriteH; -}; - int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) { int32_t code = 0; - char *fname = NULL; // TODO + char fname[TSDB_FILENAME_LEN]; + char hdr[TSDB_FHDR_SIZE] = {0}; SDelFWriter *pDelFWriter; + int64_t n; + // alloc pDelFWriter = (SDelFWriter *)taosMemoryCalloc(1, sizeof(*pDelFWriter)); if (pDelFWriter == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - pDelFWriter->pTsdb = pTsdb; - pDelFWriter->pFile = pFile; + pDelFWriter->fDel = *pFile; + + tsdbDelFileName(pTsdb, pFile, fname); pDelFWriter->pWriteH = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE); if (pDelFWriter->pWriteH == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosLSeekFile(pDelFWriter->pWriteH, TSDB_FHDR_SIZE, SEEK_SET) < 0) { + // update header + n = taosWriteFile(pDelFWriter->pWriteH, &hdr, TSDB_FHDR_SIZE); + if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - pDelFWriter->pFile->size = TSDB_FHDR_SIZE; - pDelFWriter->pFile->offset = 0; + pDelFWriter->fDel.size = TSDB_FHDR_SIZE; + pDelFWriter->fDel.size = 0; + *ppWriter = pDelFWriter; return code; _err: tsdbError("vgId:%d failed to open del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + *ppWriter = NULL; return code; } @@ -80,28 +82,33 @@ _err: return code; } -int32_t tsdbWriteDelData(SDelFWriter *pWriter, SMapData *pDelDataMap, uint8_t **ppBuf, SDelIdx *pDelIdx) { - int32_t code = 0; - uint8_t *pBuf = NULL; - int64_t size = 0; - int64_t n = 0; +int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, uint8_t **ppBuf, SDelIdx *pDelIdx) { + int32_t code = 0; + uint8_t *pBuf = NULL; + int64_t size; + int64_t n; + SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pDelIdx->suid, .uid = pDelIdx->uid}; + + if (!ppBuf) ppBuf = &pBuf; // prepare - size += tPutU32(NULL, TSDB_FILE_DLMT); - size += tPutI64(NULL, pDelIdx->suid); - size += tPutI64(NULL, pDelIdx->uid); - size = size + tPutMapData(NULL, pDelDataMap) + sizeof(TSCKSUM); + size = sizeof(hdr); + for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) { + size += tPutDelData(NULL, taosArrayGet(aDelData, iDelData)); + } + size += sizeof(TSCKSUM); // alloc - if (!ppBuf) ppBuf = &pBuf; code = tsdbRealloc(ppBuf, size); if (code) goto _err; // build - n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT); - n += tPutI64(*ppBuf + n, pDelIdx->suid); - n += tPutI64(*ppBuf + n, pDelIdx->uid); - n += tPutMapData(*ppBuf + n, pDelDataMap); + n = 0; + *(SBlockDataHdr *)(*ppBuf) = hdr; + n += sizeof(hdr); + for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) { + size += tPutDelData(*ppBuf + n, taosArrayGet(aDelData, iDelData)); + } taosCalcChecksumAppend(0, *ppBuf, size); ASSERT(n + sizeof(TSCKSUM) == size); @@ -116,10 +123,9 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SMapData *pDelDataMap, uint8_t ** ASSERT(n == size); // update - pDelIdx->offset = pWriter->pFile->size; + pDelIdx->offset = pWriter->fDel.size; pDelIdx->size = size; - pWriter->pFile->offset = pWriter->pFile->size; - pWriter->pFile->size += size; + pWriter->fDel.size += size; tsdbFree(pBuf); return code; @@ -130,24 +136,33 @@ _err: return code; } -int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SMapData *pDelIdxMap, uint8_t **ppBuf) { +int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx, uint8_t **ppBuf) { int32_t code = 0; - int64_t size = 0; - int64_t n = 0; + int64_t size; + int64_t n; uint8_t *pBuf = NULL; + SDelIdx *pDelIdx; + + if (!ppBuf) ppBuf = &pBuf; // prepare + size = 0; size += tPutU32(NULL, TSDB_FILE_DLMT); - size = size + tPutMapData(NULL, pDelIdxMap) + sizeof(TSCKSUM); + for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) { + size += tPutDelIdx(NULL, taosArrayGet(aDelIdx, iDelIdx)); + } + size += sizeof(TSCKSUM); // alloc - if (!ppBuf) ppBuf = &pBuf; code = tsdbRealloc(ppBuf, size); if (code) goto _err; // build + n = 0; n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT); - n += tPutMapData(*ppBuf + n, pDelIdxMap); + for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) { + n += tPutDelIdx(*ppBuf + n, taosArrayGet(aDelIdx, iDelIdx)); + } taosCalcChecksumAppend(0, *ppBuf, size); ASSERT(n + sizeof(TSCKSUM) == size); @@ -159,11 +174,9 @@ int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SMapData *pDelIdxMap, uint8_t **pp goto _err; } - ASSERT(n == size); - // update - pWriter->pFile->offset = pWriter->pFile->size; - pWriter->pFile->size += size; + pWriter->fDel.offset = pWriter->fDel.size; + pWriter->fDel.size += size; tsdbFree(pBuf); return code; @@ -174,23 +187,16 @@ _err: return code; } -int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter, uint8_t **ppBuf) { - int32_t code = 0; - uint8_t *pBuf = NULL; - int64_t size = TSDB_FHDR_SIZE; - int64_t n; - - // alloc - if (!ppBuf) ppBuf = &pBuf; - code = tsdbRealloc(ppBuf, size); - if (code) goto _err; +int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter) { + int32_t code = 0; + char hdr[TSDB_FHDR_SIZE]; + int64_t size = TSDB_FHDR_SIZE; + int64_t n; // build - memset(*ppBuf, 0, size); - n = tPutDelFile(*ppBuf, pWriter->pFile); - taosCalcChecksumAppend(0, *ppBuf, size); - - ASSERT(n <= size - sizeof(TSCKSUM)); + memset(hdr, 0, size); + tPutDelFile(hdr, &pWriter->fDel); + taosCalcChecksumAppend(0, hdr, size); // seek if (taosLSeekFile(pWriter->pWriteH, 0, SEEK_SET) < 0) { @@ -199,30 +205,29 @@ int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter, uint8_t **ppBuf) { } // write - if (taosWriteFile(pWriter->pWriteH, *ppBuf, size) < size) { + n = taosWriteFile(pWriter->pWriteH, hdr, size); + if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - tsdbFree(pBuf); return code; _err: tsdbError("vgId:%d update del file hdr failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); - tsdbFree(pBuf); return code; } // SDelFReader ==================================================== struct SDelFReader { STsdb *pTsdb; - SDelFile *pFile; + SDelFile fDel; TdFilePtr pReadH; }; int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb, uint8_t **ppBuf) { int32_t code = 0; - char *fname = NULL; // todo + char fname[TSDB_FILENAME_LEN]; SDelFReader *pDelFReader; int64_t n; @@ -235,7 +240,9 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb // open impl pDelFReader->pTsdb = pTsdb; - pDelFReader->pFile = pFile; + pDelFReader->fDel = *pFile; + + tsdbDelFileName(pTsdb, pFile, fname); pDelFReader->pReadH = taosOpenFile(fname, TD_FILE_READ); if (pDelFReader == NULL) { code = TAOS_SYSTEM_ERROR(errno); @@ -243,6 +250,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb goto _err; } +#if 0 // load and check hdr if buffer is given if (ppBuf) { code = tsdbRealloc(ppBuf, TSDB_FHDR_SIZE); @@ -266,6 +274,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb // TODO: check the content } +#endif _exit: *ppReader = pDelFReader; @@ -292,66 +301,75 @@ _exit: return code; } -int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SMapData *pDelDataMap, uint8_t **ppBuf) { - int32_t code = 0; - int64_t n; - uint32_t delimiter; - tb_uid_t suid; - tb_uid_t uid; +int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, uint8_t **ppBuf) { + int32_t code = 0; + int64_t offset = pDelIdx->offset; + int64_t size = pDelIdx->size; + int64_t n; + uint8_t *pBuf = NULL; + SBlockDataHdr *pHdr; + SDelData *pDelData = &(SDelData){0}; - // seek - if (taosLSeekFile(pReader->pReadH, pDelIdx->offset, SEEK_SET) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // alloc - if (!ppBuf) ppBuf = &pDelDataMap->pBuf; - code = tsdbRealloc(ppBuf, pDelIdx->size); - if (code) goto _err; - - // read - n = taosReadFile(pReader->pReadH, *ppBuf, pDelIdx->size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < pDelIdx->size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // check - if (!taosCheckChecksumWhole(*ppBuf, pDelIdx->size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // // decode - n = 0; - n += tGetU32(*ppBuf + n, &delimiter); - ASSERT(delimiter == TSDB_FILE_DLMT); - n += tGetI64(*ppBuf + n, &suid); - ASSERT(suid == pDelIdx->suid); - n += tGetI64(*ppBuf + n, &uid); - ASSERT(uid == pDelIdx->uid); - n += tGetMapData(*ppBuf + n, pDelDataMap); - ASSERT(n + sizeof(TSCKSUM) == pDelIdx->size); - - return code; - -_err: - tsdbError("vgId:%d read del data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); - return code; -} - -int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppBuf) { - int32_t code = 0; - int32_t n; - int64_t offset = pReader->pFile->offset; - int64_t size = pReader->pFile->size - offset; - uint32_t delimiter; - - ASSERT(ppBuf && *ppBuf); + if (!ppBuf) ppBuf = &pBuf; + + // seek + if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // alloc + code = tsdbRealloc(ppBuf, size); + if (code) goto _err; + + // read + n = taosReadFile(pReader->pReadH, *ppBuf, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // check + if (!taosCheckChecksumWhole(*ppBuf, size)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // // decode + n = 0; + pHdr = (SBlockDataHdr *)(*ppBuf + n); + ASSERT(pHdr->delimiter == TSDB_FILE_DLMT); + ASSERT(pHdr->suid == pDelIdx->suid); + ASSERT(pHdr->uid == pDelIdx->uid); + n += sizeof(*pHdr); + while (n < size - sizeof(TSCKSUM)) { + n += tGetDelData(*ppBuf + n, pDelData); + } + + ASSERT(n == size - sizeof(TSCKSUM)); + + tsdbFree(pBuf); + return code; + +_err: + tsdbError("vgId:%d read del data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbFree(pBuf); + return code; +} + +int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf) { + int32_t code = 0; + int32_t n; + int64_t offset = pReader->fDel.offset; + int64_t size = pReader->fDel.size - offset; + uint32_t delimiter; + uint8_t *pBuf = NULL; + SDelIdx *pDelIdx = &(SDelIdx){}; + + if (!ppBuf) ppBuf = &pBuf; // seek if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) { @@ -360,7 +378,6 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB } // alloc - if (!ppBuf) ppBuf = &pDelIdxMap->pBuf; code = tsdbRealloc(ppBuf, size); if (code) goto _err; @@ -384,8 +401,18 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB n = 0; n += tGetU32(*ppBuf + n, &delimiter); ASSERT(delimiter == TSDB_FILE_DLMT); - n += tGetMapData(*ppBuf + n, pDelIdxMap); - ASSERT(n + sizeof(TSCKSUM) == size); + + taosArrayClear(aDelIdx); + while (n < size - sizeof(TSCKSUM)) { + n += tGetDelIdx(*ppBuf + n, pDelIdx); + + if (taosArrayPush(aDelIdx, pDelIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } + + ASSERT(n == size - sizeof(TSCKSUM)); return code;