more code
This commit is contained in:
parent
207d69e61e
commit
36d01e84a5
|
@ -1170,8 +1170,8 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb
|
||||||
pDelFWriter->fDel = *pFile;
|
pDelFWriter->fDel = *pFile;
|
||||||
|
|
||||||
tsdbDelFileName(pTsdb, pFile, fname);
|
tsdbDelFileName(pTsdb, pFile, fname);
|
||||||
code =
|
int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE;
|
||||||
tsdbOpenFile(fname, TSDB_DEFAULT_PAGE_SIZE, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, &pDelFWriter->pWriteH);
|
code = tsdbOpenFile(fname, TSDB_DEFAULT_PAGE_SIZE, flag, &pDelFWriter->pWriteH);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// update header
|
// update header
|
||||||
|
|
|
@ -1175,10 +1175,40 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSnapMoveWriteDelData(STsdbSnapWriter* pWriter, TABLEID* pId) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
if (pWriter->iDelIdx >= taosArrayGetSize(pWriter->aDelIdxR)) break;
|
||||||
|
|
||||||
|
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
|
||||||
|
|
||||||
|
if (tTABLEIDCmprFn(pDelIdx, pId) >= 0) break;
|
||||||
|
|
||||||
|
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
SDelIdx delIdx = *pDelIdx;
|
||||||
|
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
pWriter->iDelIdx++;
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STsdb* pTsdb = pWriter->pTsdb;
|
STsdb* pTsdb = pWriter->pTsdb;
|
||||||
|
|
||||||
|
// Open del file if not opened yet
|
||||||
if (pWriter->pDelFWriter == NULL) {
|
if (pWriter->pDelFWriter == NULL) {
|
||||||
SDelFile* pDelFile = pWriter->fs.pDelFile;
|
SDelFile* pDelFile = pWriter->fs.pDelFile;
|
||||||
|
|
||||||
|
@ -1189,38 +1219,28 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
|
||||||
|
|
||||||
code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR);
|
code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
} else {
|
||||||
|
taosArrayClear(pWriter->aDelIdxR);
|
||||||
}
|
}
|
||||||
|
pWriter->iDelIdx = 0;
|
||||||
|
|
||||||
// writer
|
// writer
|
||||||
SDelFile delFile = {.commitID = pWriter->commitID, .offset = 0, .size = 0};
|
SDelFile delFile = {.commitID = pWriter->commitID};
|
||||||
code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb);
|
code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
taosArrayClear(pWriter->aDelIdxW);
|
||||||
}
|
}
|
||||||
|
|
||||||
// process the del data
|
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
||||||
TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr));
|
TABLEID id = *(TABLEID*)pHdr->data;
|
||||||
|
|
||||||
while (true) {
|
ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData);
|
||||||
if (pWriter->iDelIdx >= taosArrayGetSize(pWriter->aDelIdxR)) break;
|
|
||||||
if (tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) >= 0) break;
|
|
||||||
|
|
||||||
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
|
// Move write data < id
|
||||||
|
code = tsdbSnapMoveWriteDelData(pWriter, &id);
|
||||||
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
|
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
SDelIdx delIdx = *pDelIdx;
|
// Merge incoming data with current
|
||||||
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
pWriter->iDelIdx++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR) &&
|
if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR) &&
|
||||||
tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) == 0) {
|
tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) == 0) {
|
||||||
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
|
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
|
||||||
|
@ -1269,22 +1289,10 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
|
||||||
|
|
||||||
if (pWriter->pDelFWriter == NULL) goto _exit;
|
if (pWriter->pDelFWriter == NULL) goto _exit;
|
||||||
|
|
||||||
for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR); pWriter->iDelIdx++) {
|
TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX};
|
||||||
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
|
code = tsdbSnapMoveWriteDelData(pWriter, &id);
|
||||||
|
|
||||||
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
|
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
SDelIdx delIdx = *pDelIdx;
|
|
||||||
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
if (taosArrayPush(pWriter->aDelIdxR, &delIdx) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
|
code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
|
|
|
@ -354,7 +354,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
|
||||||
code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
|
code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
} break;
|
} break;
|
||||||
case SNAP_DATA_TSDB: {
|
case SNAP_DATA_TSDB:
|
||||||
|
case SNAP_DATA_DEL: {
|
||||||
// tsdb
|
// tsdb
|
||||||
if (pWriter->pTsdbSnapWriter == NULL) {
|
if (pWriter->pTsdbSnapWriter == NULL) {
|
||||||
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
|
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
|
||||||
|
|
Loading…
Reference in New Issue