refact more code
This commit is contained in:
parent
545e4d7f02
commit
fb2b42f968
|
@ -21,6 +21,7 @@ extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SB
|
|||
#define TSDB_MEM_TABLE_DATA_ITER 0
|
||||
#define TSDB_DATA_FILE_DATA_ITER 1
|
||||
#define TSDB_STT_FILE_DATA_ITER 2
|
||||
#define TSDB_TOMB_FILE_DATA_ITER 3
|
||||
|
||||
typedef struct STsdbDataIter2 STsdbDataIter2;
|
||||
struct STsdbDataIter2 {
|
||||
|
@ -55,6 +56,14 @@ struct STsdbDataIter2 {
|
|||
int32_t iSttBlk;
|
||||
int32_t iRow;
|
||||
} sIter;
|
||||
// TSDB_TOMB_FILE_DATA_ITER
|
||||
struct {
|
||||
SDelFReader* pReader;
|
||||
SArray* aDelIdx;
|
||||
SArray* aDelData;
|
||||
int32_t iDelIdx;
|
||||
int32_t iDelData;
|
||||
} tIter;
|
||||
};
|
||||
};
|
||||
|
||||
|
@ -152,6 +161,49 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbOpenTombFileDataIter(SDelFReader* pReader, STsdbDataIter2** ppIter) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter));
|
||||
if (pIter == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
pIter->type = TSDB_TOMB_FILE_DATA_ITER;
|
||||
|
||||
pIter->tIter.pReader = pReader;
|
||||
if ((pIter->tIter.aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
if ((pIter->tIter.aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbReadDelIdx(pReader, pIter->tIter.aDelIdx);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (taosArrayGetSize(pIter->tIter.aDelIdx) == 0) goto _clear;
|
||||
|
||||
pIter->tIter.iDelIdx = 0;
|
||||
pIter->tIter.iDelData = 0;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
if (pIter) {
|
||||
_clear:
|
||||
taosArrayDestroy(pIter->tIter.aDelIdx);
|
||||
taosArrayDestroy(pIter->tIter.aDelData);
|
||||
taosMemoryFree(pIter);
|
||||
pIter = NULL;
|
||||
}
|
||||
}
|
||||
*ppIter = pIter;
|
||||
return code;
|
||||
}
|
||||
|
||||
/* close */
|
||||
static void tsdbCloseDataFileDataIter(STsdbDataIter2* pIter) {
|
||||
tBlockDataDestroy(&pIter->dIter.bData, 1);
|
||||
|
@ -921,19 +973,19 @@ struct STsdbSnapWriter {
|
|||
uint8_t* aBuf[5];
|
||||
|
||||
STsdbFS fs;
|
||||
TABLEID tbid;
|
||||
|
||||
// time-series data
|
||||
SBlockData inData;
|
||||
|
||||
int32_t fid;
|
||||
TABLEID tbid;
|
||||
SSkmInfo skmTable;
|
||||
|
||||
/* reader */
|
||||
SDataFReader* pDataFReader;
|
||||
STsdbDataIter2* iterList;
|
||||
STsdbDataIter2* pDIter;
|
||||
STsdbDataIter2* pIter;
|
||||
STsdbDataIter2* pSIter;
|
||||
SRBTree rbt; // SRBTree<STsdbDataIter2>
|
||||
|
||||
/* writer */
|
||||
|
@ -945,12 +997,14 @@ struct STsdbSnapWriter {
|
|||
SBlockData sData;
|
||||
|
||||
// tombstone data
|
||||
SDelFReader* pDelFReader;
|
||||
/* reader */
|
||||
SDelFReader* pDelFReader;
|
||||
STsdbDataIter2* pTIter;
|
||||
|
||||
/* writer */
|
||||
SDelFWriter* pDelFWriter;
|
||||
int32_t iDelIdx;
|
||||
SArray* aDelIdxR;
|
||||
SArray* aDelIdx;
|
||||
SArray* aDelData;
|
||||
SArray* aDelIdxW;
|
||||
};
|
||||
|
||||
// SNAP_DATA_TSDB
|
||||
|
@ -1207,6 +1261,7 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
|||
|
||||
if (pWriter->bData.nRow > 0) {
|
||||
if (pWriter->bData.nRow < pWriter->minRow) {
|
||||
ASSERT(TABLE_SAME_SCHEMA(pWriter->sData.suid, pWriter->sData.uid, pWriter->tbid.suid, pWriter->tbid.uid));
|
||||
for (int32_t iRow = 0; iRow < pWriter->bData.nRow; iRow++) {
|
||||
code =
|
||||
tBlockDataAppendRow(&pWriter->sData, &tsdbRowFromBlockData(&pWriter->bData, iRow), NULL, pWriter->tbid.uid);
|
||||
|
@ -1262,7 +1317,7 @@ static int32_t tsdbSnapWriteFileDataStart(STsdbSnapWriter* pWriter, int32_t fid)
|
|||
pWriter->pDataFReader = NULL;
|
||||
pWriter->iterList = NULL;
|
||||
pWriter->pDIter = NULL;
|
||||
pWriter->pIter = NULL;
|
||||
pWriter->pSIter = NULL;
|
||||
tRBTreeCreate(&pWriter->rbt, tsdbDataIterCmprFn);
|
||||
if (pSet) {
|
||||
code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
|
||||
|
@ -1276,23 +1331,23 @@ static int32_t tsdbSnapWriteFileDataStart(STsdbSnapWriter* pWriter, int32_t fid)
|
|||
}
|
||||
|
||||
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||
code = tsdbOpenSttFileDataIter(pWriter->pDataFReader, iStt, &pWriter->pIter);
|
||||
code = tsdbOpenSttFileDataIter(pWriter->pDataFReader, iStt, &pWriter->pSIter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (pWriter->pIter) {
|
||||
code = tsdbSttFileDataIterNext(pWriter->pIter);
|
||||
if (pWriter->pSIter) {
|
||||
code = tsdbSttFileDataIterNext(pWriter->pSIter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// add to tree
|
||||
tRBTreePut(&pWriter->rbt, &pWriter->pIter->rbtn);
|
||||
tRBTreePut(&pWriter->rbt, &pWriter->pSIter->rbtn);
|
||||
|
||||
// add to list
|
||||
pWriter->pIter->next = pWriter->iterList;
|
||||
pWriter->iterList = pWriter->pIter;
|
||||
pWriter->pSIter->next = pWriter->iterList;
|
||||
pWriter->iterList = pWriter->pSIter;
|
||||
}
|
||||
}
|
||||
|
||||
pWriter->pIter = NULL;
|
||||
pWriter->pSIter = NULL;
|
||||
}
|
||||
|
||||
// open writer
|
||||
|
@ -1348,7 +1403,7 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn
|
|||
|
||||
// switch to new table if need
|
||||
if (pRowInfo == NULL || pRowInfo->uid != pWriter->tbid.uid) {
|
||||
if (pWriter->tbid.uid != 0) {
|
||||
if (pWriter->tbid.uid) {
|
||||
code = tsdbSnapWriteTableDataEnd(pWriter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
@ -1357,11 +1412,10 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// end with a NULL row
|
||||
if (pRowInfo) {
|
||||
code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
if (pRowInfo == NULL) goto _exit;
|
||||
|
||||
code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -1374,19 +1428,19 @@ static int32_t tsdbSnapWriteNextRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowIn
|
|||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (pWriter->pIter) {
|
||||
code = tsdbDataIterNext2(pWriter->pIter);
|
||||
if (pWriter->pSIter) {
|
||||
code = tsdbDataIterNext2(pWriter->pSIter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (pWriter->pIter->rowInfo.suid == 0 && pWriter->pIter->rowInfo.uid == 0) {
|
||||
pWriter->pIter = NULL;
|
||||
if (pWriter->pSIter->rowInfo.suid == 0 && pWriter->pSIter->rowInfo.uid == 0) {
|
||||
pWriter->pSIter = NULL;
|
||||
} else {
|
||||
SRBTreeNode* pNode = tRBTreeMin(&pWriter->rbt);
|
||||
if (pNode) {
|
||||
int32_t c = tsdbDataIterCmprFn(&pWriter->pIter->rbtn, pNode);
|
||||
int32_t c = tsdbDataIterCmprFn(&pWriter->pSIter->rbtn, pNode);
|
||||
if (c > 0) {
|
||||
tRBTreePut(&pWriter->rbt, &pWriter->pIter->rbtn);
|
||||
pWriter->pIter = NULL;
|
||||
tRBTreePut(&pWriter->rbt, &pWriter->pSIter->rbtn);
|
||||
pWriter->pSIter = NULL;
|
||||
} else if (c == 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -1394,17 +1448,17 @@ static int32_t tsdbSnapWriteNextRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowIn
|
|||
}
|
||||
}
|
||||
|
||||
if (pWriter->pIter == NULL) {
|
||||
if (pWriter->pSIter == NULL) {
|
||||
SRBTreeNode* pNode = tRBTreeMin(&pWriter->rbt);
|
||||
if (pNode) {
|
||||
tRBTreeDrop(&pWriter->rbt, pNode);
|
||||
pWriter->pIter = TSDB_RBTN_TO_DATA_ITER(pNode);
|
||||
pWriter->pSIter = TSDB_RBTN_TO_DATA_ITER(pNode);
|
||||
}
|
||||
}
|
||||
|
||||
if (ppRowInfo) {
|
||||
if (pWriter->pIter) {
|
||||
*ppRowInfo = &pWriter->pIter->rowInfo;
|
||||
if (pWriter->pSIter) {
|
||||
*ppRowInfo = &pWriter->pSIter->rowInfo;
|
||||
} else {
|
||||
*ppRowInfo = NULL;
|
||||
}
|
||||
|
@ -1421,8 +1475,8 @@ static int32_t tsdbSnapWriteGetRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInf
|
|||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (pWriter->pIter) {
|
||||
*ppRowInfo = &pWriter->pIter->rowInfo;
|
||||
if (pWriter->pSIter) {
|
||||
*ppRowInfo = &pWriter->pSIter->rowInfo;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
|
@ -1558,143 +1612,238 @@ _exit:
|
|||
}
|
||||
|
||||
// SNAP_DATA_DEL
|
||||
static int32_t tsdbSnapMoveWriteDelData(STsdbSnapWriter* pWriter, TABLEID* pId) {
|
||||
static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
while (true) {
|
||||
if (pWriter->iDelIdx >= taosArrayGetSize(pWriter->aDelIdxR)) break;
|
||||
if (pId) {
|
||||
pWriter->tbid = *pId;
|
||||
} else {
|
||||
pWriter->tbid = (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX};
|
||||
}
|
||||
|
||||
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
|
||||
if (pWriter->pTIter) {
|
||||
ASSERT(pWriter->pTIter->tIter.iDelData >= taosArrayGetSize(pWriter->pTIter->tIter.aDelData));
|
||||
|
||||
if (tTABLEIDCmprFn(pDelIdx, pId) >= 0) break;
|
||||
for (;;) {
|
||||
if (pWriter->pTIter->tIter.iDelIdx >= taosArrayGetSize(pWriter->pTIter->tIter.aDelIdx)) {
|
||||
break;
|
||||
}
|
||||
|
||||
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
|
||||
if (code) goto _exit;
|
||||
SDelIdx* pDelIdx = taosArrayGet(pWriter->pTIter->tIter.aDelIdx, pWriter->pTIter->tIter.iDelIdx);
|
||||
|
||||
SDelIdx delIdx = *pDelIdx;
|
||||
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
|
||||
if (code) goto _exit;
|
||||
int32_t c = tTABLEIDCmprFn(pDelIdx, &pWriter->tbid);
|
||||
if (c < 0) {
|
||||
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
|
||||
SDelIdx* pDelIdxNew = taosArrayReserve(pWriter->pTIter->tIter.aDelIdx, 1);
|
||||
if (pDelIdxNew == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
pDelIdxNew->suid = pDelIdx->suid;
|
||||
pDelIdxNew->uid = pDelIdx->uid;
|
||||
|
||||
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->pTIter->tIter.aDelData, pDelIdxNew);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
pWriter->pTIter->tIter.iDelIdx++;
|
||||
} else if (c == 0) {
|
||||
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (taosArrayAddBatch(pWriter->aDelData, pWriter->pTIter->tIter.aDelData->pData,
|
||||
taosArrayGetSize(pWriter->pTIter->tIter.aDelData)) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
pWriter->pTIter->tIter.iDelData = taosArrayGetSize(pWriter->pTIter->tIter.aDelData);
|
||||
pWriter->pTIter->tIter.iDelIdx++;
|
||||
break;
|
||||
} else {
|
||||
pWriter->pTIter->tIter.iDelData = taosArrayGetSize(pWriter->pTIter->tIter.aDelData);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayClear(pWriter->aDelData);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__, pId->suid,
|
||||
pId->uid);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteDelTableDataEnd(STsdbSnapWriter* pWriter) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (taosArrayGetSize(pWriter->aDelData) > 0) {
|
||||
SDelIdx* pDelIdx = taosArrayReserve(pWriter->aDelIdx, 1);
|
||||
if (pDelIdx == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
pWriter->iDelIdx++;
|
||||
pDelIdx->suid = pWriter->tbid.suid;
|
||||
pDelIdx->uid = pWriter->tbid.uid;
|
||||
|
||||
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, pDelIdx);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbTrace("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteDelTableData(STsdbSnapWriter* pWriter, TABLEID* pId, uint8_t* pData, int64_t size) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (pId == NULL || pId->uid != pWriter->tbid.uid) {
|
||||
if (pWriter->tbid.uid) {
|
||||
code = tsdbSnapWriteDelTableDataEnd(pWriter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbSnapWriteDelTableDataStart(pWriter, pId);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (pId == NULL) goto _exit;
|
||||
|
||||
int64_t n = 0;
|
||||
while (n < size) {
|
||||
SDelData delData;
|
||||
n += tGetDelData(pData + n, &delData);
|
||||
|
||||
if (taosArrayPush(pWriter->aDelData, &delData) < 0) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(n == size);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteDelDataStart(STsdbSnapWriter* pWriter) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
STsdb* pTsdb = pWriter->pTsdb;
|
||||
SDelFile* pDelFile = pWriter->fs.pDelFile;
|
||||
|
||||
pWriter->tbid = (TABLEID){0};
|
||||
|
||||
// reader
|
||||
if (pDelFile) {
|
||||
code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbOpenTombFileDataIter(pWriter->pDelFReader, &pWriter->pTIter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// writer
|
||||
code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &(SDelFile){.commitID = pWriter->commitID}, pTsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if ((pWriter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
if ((pWriter->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteDelDataEnd(STsdbSnapWriter* pWriter) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
STsdb* pTsdb = pWriter->pTsdb;
|
||||
|
||||
// end remaining table (TODO)
|
||||
code = tsdbSnapWriteDelTableData(pWriter, NULL, NULL, 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// update file-level info
|
||||
code = tsdbWriteDelIdx(pWriter->pDelFWriter, pWriter->aDelIdx);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbFSUpsertDelFile(&pWriter->fs, &pWriter->pDelFWriter->fDel);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (pWriter->pDelFReader) {
|
||||
code = tsdbDelFReaderClose(&pWriter->pDelFReader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteDelData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
|
||||
int32_t code = 0;
|
||||
STsdb* pTsdb = pWriter->pTsdb;
|
||||
int32_t lino = 0;
|
||||
|
||||
// Open del file if not opened yet
|
||||
STsdb* pTsdb = pWriter->pTsdb;
|
||||
|
||||
// start to write del data if need
|
||||
if (pWriter->pDelFWriter == NULL) {
|
||||
SDelFile* pDelFile = pWriter->fs.pDelFile;
|
||||
|
||||
// reader
|
||||
if (pDelFile) {
|
||||
code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
taosArrayClear(pWriter->aDelIdxR);
|
||||
}
|
||||
pWriter->iDelIdx = 0;
|
||||
|
||||
// writer
|
||||
SDelFile delFile = {.commitID = pWriter->commitID};
|
||||
code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb);
|
||||
if (code) goto _err;
|
||||
taosArrayClear(pWriter->aDelIdxW);
|
||||
code = tsdbSnapWriteDelDataStart(pWriter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
TABLEID id = *(TABLEID*)pHdr->data;
|
||||
code = tsdbSnapWriteDelTableData(pWriter, (TABLEID*)pHdr->data, pHdr->data + sizeof(TABLEID),
|
||||
pHdr->size - sizeof(TABLEID));
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// Move write data < id
|
||||
code = tsdbSnapMoveWriteDelData(pWriter, &id);
|
||||
if (code) goto _err;
|
||||
|
||||
// Merge incoming data with current
|
||||
if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR) &&
|
||||
tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) == 0) {
|
||||
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
|
||||
|
||||
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
|
||||
if (code) goto _err;
|
||||
|
||||
pWriter->iDelIdx++;
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed since %s", TD_VID(pTsdb->pVnode), __func__, tstrerror(code));
|
||||
} else {
|
||||
taosArrayClear(pWriter->aDelData);
|
||||
tsdbTrace("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
|
||||
}
|
||||
|
||||
int64_t n = sizeof(TABLEID);
|
||||
while (n < pHdr->size) {
|
||||
SDelData delData;
|
||||
|
||||
n += tGetDelData(pHdr->data + n, &delData);
|
||||
|
||||
if (taosArrayPush(pWriter->aDelData, &delData) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
SDelIdx delIdx = {.suid = id.suid, .uid = id.uid};
|
||||
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, vnode snapshot tsdb write del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
|
||||
tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
|
||||
int32_t code = 0;
|
||||
STsdb* pTsdb = pWriter->pTsdb;
|
||||
|
||||
if (pWriter->pDelFWriter == NULL) return code;
|
||||
|
||||
TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX};
|
||||
code = tsdbSnapMoveWriteDelData(pWriter, &id);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbWriteDelIdx(pWriter->pDelFWriter, pWriter->aDelIdxW);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbFSUpsertDelFile(&pWriter->fs, &pWriter->pDelFWriter->fDel);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1);
|
||||
if (code) goto _err;
|
||||
|
||||
if (pWriter->pDelFReader) {
|
||||
code = tsdbDelFReaderClose(&pWriter->pDelFReader);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
tsdbInfo("vgId:%d, vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, vnode snapshot tsdb write del end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
|
||||
tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1738,7 +1887,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
if (pWriter) {
|
||||
tBlockDataDestroy(&pWriter->sData, 1);
|
||||
tBlockDataDestroy(&pWriter->bData, 1);
|
||||
|
@ -1755,20 +1904,26 @@ _exit:
|
|||
|
||||
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (pWriter->pDataFWriter) {
|
||||
code = tsdbSnapWriteFileDataEnd(pWriter);
|
||||
if (code) goto _exit;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbSnapWriteDelEnd(pWriter);
|
||||
if (code) goto _exit;
|
||||
if (pWriter->pDelFWriter) {
|
||||
code = tsdbSnapWriteDelDataEnd(pWriter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs);
|
||||
if (code) goto _exit;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbDebug("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -1795,9 +1950,8 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
|
|||
}
|
||||
|
||||
// SNAP_DATA_DEL
|
||||
taosArrayDestroy(pWriter->aDelIdxW);
|
||||
taosArrayDestroy(pWriter->aDelData);
|
||||
taosArrayDestroy(pWriter->aDelIdxR);
|
||||
taosArrayDestroy(pWriter->aDelIdx);
|
||||
|
||||
// SNAP_DATA_TSDB
|
||||
tBlockDataDestroy(&pWriter->sData, 1);
|
||||
|
|
Loading…
Reference in New Issue