more code

This commit is contained in:
Hongze Cheng 2023-01-28 18:03:13 +08:00
parent 8c48e2545d
commit b996618e65
1 changed files with 93 additions and 219 deletions

View File

@ -16,6 +16,9 @@
#include "tsdb.h"
extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData);
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg);
extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg);
// STsdbDataIter2 ========================================
#define TSDB_MEM_TABLE_DATA_ITER 0
@ -218,6 +221,12 @@ static void tsdbCloseSttFileDataIter(STsdbDataIter2* pIter) {
taosMemoryFree(pIter);
}
static void tsdbCloseTombFileDataIter(STsdbDataIter2* pIter) {
taosArrayDestroy(pIter->tIter.aDelData);
taosArrayDestroy(pIter->tIter.aDelIdx);
taosMemoryFree(pIter);
}
static void tsdbCloseDataIter2(STsdbDataIter2* pIter) {
if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) {
ASSERT(0);
@ -225,6 +234,8 @@ static void tsdbCloseDataIter2(STsdbDataIter2* pIter) {
tsdbCloseDataFileDataIter(pIter);
} else if (pIter->type == TSDB_STT_FILE_DATA_ITER) {
tsdbCloseSttFileDataIter(pIter);
} else if (pIter->type == TSDB_TOMB_FILE_DATA_ITER) {
tsdbCloseTombFileDataIter(pIter);
} else {
ASSERT(0);
}
@ -347,55 +358,32 @@ static int32_t tsdbDataIterNext2(STsdbDataIter2* pIter) {
/* get */
// STsdbSnapReader ========================================
typedef enum { SNAP_DATA_FILE_ITER = 0, SNAP_STT_FILE_ITER } EFIterT;
typedef struct {
SRBTreeNode n;
SRowInfo rInfo;
EFIterT type;
union {
struct {
SArray* aBlockIdx;
int32_t iBlockIdx;
SBlockIdx* pBlockIdx;
SMapData mBlock;
int32_t iBlock;
}; // .data file
struct {
int32_t iStt;
SArray* aSttBlk;
int32_t iSttBlk;
}; // .stt file
};
SBlockData bData;
int32_t iRow;
} SFDataIter;
struct STsdbSnapReader {
STsdb* pTsdb;
int64_t sver;
int64_t ever;
STsdbFS fs;
int8_t type;
// for data file
uint8_t* aBuf[5];
STsdbFS fs;
SSkmInfo skmTable;
// timeseries data
int8_t dataDone;
int32_t fid;
SDataFReader* pDataFReader;
SFDataIter* pIter;
STsdbDataIter2* iterList;
STsdbDataIter2* pIter;
SRBTree rbt;
SFDataIter aFDataIter[TSDB_MAX_STT_TRIGGER + 1];
SBlockData bData;
SSkmInfo skmTable;
// for del file
// tombstone data
int8_t delDone;
SDelFReader* pDelFReader;
SArray* aDelIdx; // SArray<SDelIdx>
int32_t iDelIdx;
SArray* aDelData; // SArray<SDelData>
uint8_t* aBuf[5];
STsdbDataIter2* pTIter;
};
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
static int32_t tFDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) {
SFDataIter* pIter1 = (SFDataIter*)(((uint8_t*)pNode1) - offsetof(SFDataIter, n));
SFDataIter* pIter2 = (SFDataIter*)(((uint8_t*)pNode2) - offsetof(SFDataIter, n));
@ -647,7 +635,7 @@ _exit:
return code;
}
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
int32_t lino = 0;
@ -705,7 +693,7 @@ _exit:
return code;
}
static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
static int32_t tsdbSnapReadTombstoneData(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
int32_t lino = 0;
@ -792,10 +780,9 @@ _exit:
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader) {
int32_t code = 0;
int32_t lino = 0;
STsdbSnapReader* pReader = NULL;
// alloc
pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
STsdbSnapReader* pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
@ -805,118 +792,78 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type
pReader->ever = ever;
pReader->type = type;
code = taosThreadRwlockRdlock(&pTsdb->rwLock);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosThreadRwlockRdlock(&pTsdb->rwLock);
code = tsdbFSRef(pTsdb, &pReader->fs);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosThreadRwlockUnlock(&pTsdb->rwLock);
code = taosThreadRwlockUnlock(&pTsdb->rwLock);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
TSDB_CHECK_CODE(code, lino, _exit);
}
// data
// init
pReader->fid = INT32_MIN;
for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) {
SFDataIter* pIter = &pReader->aFDataIter[iIter];
if (iIter == 0) {
pIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pIter->aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if (pIter->aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
code = tBlockDataCreate(&pIter->bData);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataCreate(&pReader->bData);
TSDB_CHECK_CODE(code, lino, _exit);
// del
pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
if (pReader->aDelIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pReader->aDelData = taosArrayInit(0, sizeof(SDelData));
if (pReader->aDelData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s, TSDB path: %s", TD_VID(pTsdb->pVnode), __func__, lino,
tstrerror(code), pTsdb->path);
*ppReader = NULL;
tsdbError("vgId:%d %s failed at line %d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(pTsdb->pVnode),
__func__, lino, tstrerror(code), sver, ever, type);
if (pReader) {
taosArrayDestroy(pReader->aDelData);
taosArrayDestroy(pReader->aDelIdx);
tBlockDataDestroy(&pReader->bData, 1);
tsdbFSDestroy(&pReader->fs);
tsdbFSUnref(pTsdb, &pReader->fs);
taosMemoryFree(pReader);
pReader = NULL;
}
} else {
*ppReader = pReader;
tsdbInfo("vgId:%d, vnode snapshot tsdb reader opened for %s", TD_VID(pTsdb->pVnode), pTsdb->path);
tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(pTsdb->pVnode), __func__, sver, ever,
type);
}
*ppReader = pReader;
return code;
}
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
int32_t code = 0;
int32_t lino = 0;
STsdbSnapReader* pReader = *ppReader;
// data
if (pReader->pDataFReader) tsdbDataFReaderClose(&pReader->pDataFReader);
for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) {
SFDataIter* pIter = &pReader->aFDataIter[iIter];
if (iIter == 0) {
taosArrayDestroy(pIter->aBlockIdx);
tMapDataClear(&pIter->mBlock);
} else {
taosArrayDestroy(pIter->aSttBlk);
}
tBlockDataDestroy(&pIter->bData, 1);
// tombstone
if (pReader->pTIter) {
tsdbCloseDataIter2(pReader->pTIter);
pReader->pTIter = NULL;
}
if (pReader->pDelFReader) {
tsdbDelFReaderClose(&pReader->pDelFReader);
}
// timeseries
tBlockDataDestroy(&pReader->bData, 1);
while (pReader->iterList) {
STsdbDataIter2* pIter = pReader->iterList;
pReader->iterList = pIter->next;
tsdbCloseDataIter2(pIter);
}
if (pReader->pDataFReader) {
tsdbDataFReaderClose(&pReader->pDataFReader);
}
// other
tDestroyTSchema(pReader->skmTable.pTSchema);
// del
if (pReader->pDelFReader) tsdbDelFReaderClose(&pReader->pDelFReader);
taosArrayDestroy(pReader->aDelIdx);
taosArrayDestroy(pReader->aDelData);
tsdbFSUnref(pReader->pTsdb, &pReader->fs);
tsdbInfo("vgId:%d, vnode snapshot tsdb reader closed for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path);
for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(pReader->aBuf[0]); iBuf++) {
tFree(pReader->aBuf[iBuf]);
}
taosMemoryFree(pReader);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbDebug("vgId:%d %s done", TD_VID(pReader->pTsdb->pVnode), __func__);
}
*ppReader = NULL;
return code;
}
@ -929,7 +876,7 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
// read data file
if (!pReader->dataDone) {
code = tsdbSnapReadData(pReader, ppData);
code = tsdbSnapReadTimeSeriesData(pReader, ppData);
TSDB_CHECK_CODE(code, lino, _exit);
if (*ppData) {
goto _exit;
@ -940,7 +887,7 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
// read del file
if (!pReader->delDone) {
code = tsdbSnapReadDel(pReader, ppData);
code = tsdbSnapReadTombstoneData(pReader, ppData);
TSDB_CHECK_CODE(code, lino, _exit);
if (*ppData) {
goto _exit;
@ -951,10 +898,9 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
_exit:
if (code) {
tsdbError("vgId:%d, %s failed since %s, path:%s", TD_VID(pReader->pTsdb->pVnode), __func__, tstrerror(code),
pReader->pTsdb->path);
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbDebug("vgId:%d, %s done, path:%s", TD_VID(pReader->pTsdb->pVnode), __func__, pReader->pTsdb->path);
tsdbDebug("vgId:%d %s done", TD_VID(pReader->pTsdb->pVnode), __func__);
}
return code;
}
@ -1008,69 +954,6 @@ struct STsdbSnapWriter {
};
// SNAP_DATA_TSDB
extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg);
extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg);
static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) {
int32_t code = 0;
int32_t lino = 0;
#if 0
ASSERT(pWriter->dReader.iRow >= pWriter->dReader.bData.nRow);
if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) {
pWriter->dReader.pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->dReader.aBlockIdx, pWriter->dReader.iBlockIdx);
code = tsdbReadDataBlk(pWriter->dReader.pReader, pWriter->dReader.pBlockIdx, &pWriter->dReader.mDataBlk);
if (code) goto _exit;
pWriter->dReader.iBlockIdx++;
} else {
pWriter->dReader.pBlockIdx = NULL;
tMapDataReset(&pWriter->dReader.mDataBlk);
}
pWriter->dReader.iDataBlk = 0; // point to the next one
tBlockDataReset(&pWriter->dReader.bData);
pWriter->dReader.iRow = 0;
#endif
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) {
int32_t code = 0;
int32_t lino = 0;
#if 0
while (true) {
if (pWriter->dReader.pBlockIdx == NULL) break;
if (tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0) break;
SBlockIdx blkIdx = *pWriter->dReader.pBlockIdx;
code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dReader.mDataBlk, &blkIdx);
if (code) goto _exit;
if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blkIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
code = tsdbSnapNextTableData(pWriter);
if (code) goto _exit;
}
#endif
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) {
int32_t code = 0;
int32_t lino = 0;
@ -1622,14 +1505,10 @@ static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID*
pWriter->tbid = (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX};
}
taosArrayClear(pWriter->aDelData);
if (pWriter->pTIter) {
ASSERT(pWriter->pTIter->tIter.iDelData >= taosArrayGetSize(pWriter->pTIter->tIter.aDelData));
for (;;) {
if (pWriter->pTIter->tIter.iDelIdx >= taosArrayGetSize(pWriter->pTIter->tIter.aDelIdx)) {
break;
}
while (pWriter->pTIter->tIter.iDelIdx < taosArrayGetSize(pWriter->pTIter->tIter.aDelIdx)) {
SDelIdx* pDelIdx = taosArrayGet(pWriter->pTIter->tIter.aDelIdx, pWriter->pTIter->tIter.iDelIdx);
int32_t c = tTABLEIDCmprFn(pDelIdx, &pWriter->tbid);
@ -1637,7 +1516,7 @@ static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID*
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData);
TSDB_CHECK_CODE(code, lino, _exit);
SDelIdx* pDelIdxNew = taosArrayReserve(pWriter->pTIter->tIter.aDelIdx, 1);
SDelIdx* pDelIdxNew = taosArrayReserve(pWriter->aDelIdx, 1);
if (pDelIdxNew == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
@ -1651,27 +1530,17 @@ static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID*
pWriter->pTIter->tIter.iDelIdx++;
} else if (c == 0) {
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData);
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayAddBatch(pWriter->aDelData, pWriter->pTIter->tIter.aDelData->pData,
taosArrayGetSize(pWriter->pTIter->tIter.aDelData)) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pWriter->pTIter->tIter.iDelData = taosArrayGetSize(pWriter->pTIter->tIter.aDelData);
pWriter->pTIter->tIter.iDelIdx++;
break;
} else {
pWriter->pTIter->tIter.iDelData = taosArrayGetSize(pWriter->pTIter->tIter.aDelData);
break;
}
}
}
taosArrayClear(pWriter->aDelData);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
@ -1735,7 +1604,6 @@ static int32_t tsdbSnapWriteDelTableData(STsdbSnapWriter* pWriter, TABLEID* pId,
TSDB_CHECK_CODE(code, lino, _exit);
}
}
ASSERT(n == size);
_exit:
@ -1791,7 +1659,7 @@ static int32_t tsdbSnapWriteDelDataEnd(STsdbSnapWriter* pWriter) {
STsdb* pTsdb = pWriter->pTsdb;
// end remaining table (TODO)
// end remaining table with NULL data
code = tsdbSnapWriteDelTableData(pWriter, NULL, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
@ -1813,6 +1681,11 @@ static int32_t tsdbSnapWriteDelDataEnd(STsdbSnapWriter* pWriter) {
TSDB_CHECK_CODE(code, lino, _exit);
}
if (pWriter->pTIter) {
tsdbCloseDataIter2(pWriter->pTIter);
pWriter->pTIter = NULL;
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
@ -1834,6 +1707,7 @@ static int32_t tsdbSnapWriteDelData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr
TSDB_CHECK_CODE(code, lino, _exit);
}
// do write del data
code = tsdbSnapWriteDelTableData(pWriter, (TABLEID*)pHdr->data, pHdr->data + sizeof(TABLEID),
pHdr->size - sizeof(TABLEID));
TSDB_CHECK_CODE(code, lino, _exit);