more code

This commit is contained in:
Hongze Cheng 2022-08-26 21:57:02 +08:00
parent e25f845e97
commit 8ed246bdd3
3 changed files with 43 additions and 17 deletions

View File

@ -609,6 +609,17 @@ struct STsdbReadSnap {
STsdbFS fs;
};
struct SDataFReader {
STsdb *pTsdb;
SDFileSet *pSet;
TdFilePtr pHeadFD;
TdFilePtr pDataFD;
TdFilePtr pSmaFD;
TdFilePtr aLastFD[TSDB_MAX_LAST_FILE];
uint8_t *aBuf[3];
};
// ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1;

View File

@ -26,10 +26,12 @@ typedef struct {
TSDBROW row;
} SRowInfo;
typedef enum { MEMORY_DATA_ITER = 0, LAST_DATA_ITER } EDataIterT;
typedef struct {
SRBTreeNode n;
SRowInfo r;
int8_t type;
EDataIterT type;
union {
struct {
int32_t iTbDataP;
@ -74,7 +76,8 @@ typedef struct {
struct {
SDataIter *pIter;
SRBTree rbt;
SDataIter aDataIter[TSDB_MAX_LAST_FILE + 1];
SDataIter dataIter;
SDataIter aDataIter[TSDB_MAX_LAST_FILE];
};
struct {
SDataFWriter *pWriter;
@ -402,10 +405,9 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
tRBTreeCreate(&pCommitter->rbt, tRowInfoCmprFn);
pCommitter->pIter = NULL;
int8_t iIter = 0;
// memory
SDataIter *pIter = &pCommitter->aDataIter[iIter];
pIter->type = 0;
SDataIter *pIter = &pCommitter->dataIter;
pIter->type = MEMORY_DATA_ITER;
pIter->iTbDataP = 0;
for (; pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP); pIter->iTbDataP++) {
STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
@ -427,7 +429,31 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);
// disk
if (0) {
SDataFReader *pReader = pCommitter->dReader.pReader;
if (pReader && pReader->pSet->nLastF >= pCommitter->maxLast) {
int8_t iIter = 0;
for (int32_t iLast = 0; iLast < pReader->pSet->nLastF; iLast++) {
pIter = &pCommitter->aDataIter[iIter];
pIter->type = LAST_DATA_ITER;
pIter->iLast = iLast;
code = tsdbReadBlockL(pCommitter->dReader.pReader, iLast, pIter->aBlockL);
if (code) goto _err;
if (taosArrayGetSize(pIter->aBlockL) == 0) continue;
pIter->iBlockL = 0;
SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, 0);
code = tsdbReadLastBlockEx(pCommitter->dReader.pReader, iLast, pBlockL, &pIter->bData);
if (code) goto _err;
pIter->r.suid = pIter->bData.suid;
pIter->r.uid = pIter->bData.uid;
pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);
tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);
iIter++;
}
}
code = tsdbNextCommitRow(pCommitter);

View File

@ -394,17 +394,6 @@ _err:
}
// SDataFReader ====================================================
struct SDataFReader {
STsdb *pTsdb;
SDFileSet *pSet;
TdFilePtr pHeadFD;
TdFilePtr pDataFD;
TdFilePtr pSmaFD;
TdFilePtr aLastFD[TSDB_MAX_LAST_FILE];
uint8_t *aBuf[3];
};
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
int32_t code = 0;
SDataFReader *pReader;