more code
This commit is contained in:
parent
cfa666f247
commit
eb2fb724b1
|
@ -266,7 +266,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMa
|
||||||
int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL);
|
int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL);
|
||||||
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg);
|
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg);
|
||||||
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData);
|
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData);
|
||||||
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData);
|
int32_t tsdbReadLastBlock(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData);
|
||||||
// SDelFWriter
|
// SDelFWriter
|
||||||
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
|
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
|
||||||
int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync);
|
int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync);
|
||||||
|
|
|
@ -503,7 +503,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
case SFSLASTNEXTROW_BLOCKDATA:
|
case SFSLASTNEXTROW_BLOCKDATA:
|
||||||
code = tsdbReadLastBlock(state->pDataFReader, state->pBlockL, state->pBlockDataL);
|
code = tsdbReadLastBlock(state->pDataFReader, 0, state->pBlockL, state->pBlockDataL);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
state->nRow = state->blockDataL.nRow;
|
state->nRow = state->blockDataL.nRow;
|
||||||
|
|
|
@ -128,8 +128,8 @@ typedef struct {
|
||||||
struct {
|
struct {
|
||||||
SDataFReader *pReader;
|
SDataFReader *pReader;
|
||||||
SArray *aBlockIdx;
|
SArray *aBlockIdx;
|
||||||
|
SLDataIter aLDataiter[TSDB_MAX_LAST_FILE];
|
||||||
SDataMerger merger;
|
SDataMerger merger;
|
||||||
SArray *aBlockL[TSDB_MAX_LAST_FILE];
|
|
||||||
} dReader;
|
} dReader;
|
||||||
struct {
|
struct {
|
||||||
SDataFWriter *pWriter;
|
SDataFWriter *pWriter;
|
||||||
|
@ -140,6 +140,9 @@ typedef struct {
|
||||||
} dWriter;
|
} dWriter;
|
||||||
} STsdbMerger;
|
} STsdbMerger;
|
||||||
|
|
||||||
|
extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL,
|
||||||
|
SBlockData *pBlockData); // todo
|
||||||
|
|
||||||
static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) {
|
static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STsdb *pTsdb = pMerger->pTsdb;
|
STsdb *pTsdb = pMerger->pTsdb;
|
||||||
|
@ -151,9 +154,42 @@ static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) {
|
||||||
code = tsdbReadBlockIdx(pMerger->dReader.pReader, pMerger->dReader.aBlockIdx);
|
code = tsdbReadBlockIdx(pMerger->dReader.pReader, pMerger->dReader.aBlockIdx);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
|
pMerger->dReader.merger.pNode = NULL;
|
||||||
|
pMerger->dReader.merger.rbt = tRBTreeCreate(tRowInfoCmprFn);
|
||||||
for (int8_t iLast = 0; iLast < pSet->nLastF; iLast++) {
|
for (int8_t iLast = 0; iLast < pSet->nLastF; iLast++) {
|
||||||
code = tsdbReadBlockL(pMerger->dReader.pReader, iLast, pMerger->dReader.aBlockL[iLast]);
|
SRBTreeNode *pNode = (SRBTreeNode *)taosMemoryCalloc(1, sizeof(*pNode) + sizeof(SLDataIter));
|
||||||
|
if (pNode == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
SLDataIter *pIter = (SLDataIter *)pNode->payload;
|
||||||
|
|
||||||
|
pIter->aBlockL = taosArrayInit(0, sizeof(SBlockL));
|
||||||
|
if (pIter->aBlockL == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
code = tBlockDataCreate(&pIter->bData);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
|
code = tsdbReadBlockL(pMerger->dReader.pReader, iLast, pIter->aBlockL);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
if (taosArrayGetSize(pIter->aBlockL) == 0) continue;
|
||||||
|
pIter->iBlockL = 0;
|
||||||
|
|
||||||
|
SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, 0);
|
||||||
|
code = tsdbReadLastBlockEx(pMerger->dReader.pReader, iLast, pBlockL, &pIter->bData);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
pIter->iRow = 0;
|
||||||
|
pIter->rowInfo.suid = pIter->bData.suid;
|
||||||
|
pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
|
||||||
|
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, 0);
|
||||||
|
|
||||||
|
pNode = tRBTreePut(&pMerger->dReader.merger.rbt, pNode);
|
||||||
|
ASSERT(pNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
// writer
|
// writer
|
||||||
|
@ -261,13 +297,13 @@ static int32_t tsdbStartMerge(STsdbMerger *pMerger, STsdb *pTsdb) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) {
|
// for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) {
|
||||||
pMerger->dReader.aBlockL[iLast] = taosArrayInit(0, sizeof(SBlockL));
|
// pMerger->dReader.aBlockL[iLast] = taosArrayInit(0, sizeof(SBlockL));
|
||||||
if (pMerger->dReader.aBlockL[iLast] == NULL) {
|
// if (pMerger->dReader.aBlockL[iLast] == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
// code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
// goto _exit;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
// writer
|
// writer
|
||||||
pMerger->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
pMerger->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
||||||
|
@ -305,9 +341,9 @@ static int32_t tsdbEndMerge(STsdbMerger *pMerger) {
|
||||||
taosArrayDestroy(pMerger->dWriter.aBlockIdx);
|
taosArrayDestroy(pMerger->dWriter.aBlockIdx);
|
||||||
|
|
||||||
// reader
|
// reader
|
||||||
for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) {
|
// for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) {
|
||||||
taosArrayDestroy(pMerger->dReader.aBlockL[iLast]);
|
// taosArrayDestroy(pMerger->dReader.aBlockL[iLast]);
|
||||||
}
|
// }
|
||||||
taosArrayDestroy(pMerger->dReader.aBlockIdx);
|
taosArrayDestroy(pMerger->dReader.aBlockIdx);
|
||||||
tsdbFSDestroy(&pMerger->fs);
|
tsdbFSDestroy(&pMerger->fs);
|
||||||
|
|
||||||
|
|
|
@ -2391,7 +2391,7 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData);
|
code = tsdbReadLastBlock(pReader->pFileReader, 0, pBlock, &pLastBlockReader->lastBlockData);
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -908,7 +908,7 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData) {
|
int32_t tsdbReadLastBlock(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
code = tsdbReadBlockDataImpl(pReader, &pBlockL->bInfo, 1, pBlockData);
|
code = tsdbReadBlockDataImpl(pReader, &pBlockL->bInfo, 1, pBlockData);
|
||||||
|
@ -921,6 +921,21 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
// read
|
||||||
|
code = tsdbReadAndCheck(pReader->aLastFD[iLast], pBlockL->bInfo.offset, &pReader->aBuf[1], pBlockL->bInfo.szBlock, 0);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
// decmpr
|
||||||
|
code = tDecmprBlockData(pReader->aBuf[1], pBlockL->bInfo.szBlock, pBlockData, &pReader->aBuf[1]);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
// SDataFWriter ====================================================
|
// SDataFWriter ====================================================
|
||||||
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) {
|
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
|
@ -1619,7 +1619,7 @@ _exit:
|
||||||
int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uint8_t *aBuf[]) {
|
int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uint8_t *aBuf[]) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
tBlockDataClear(pBlockData);
|
tBlockDataReset(pBlockData);
|
||||||
|
|
||||||
int32_t n = 0;
|
int32_t n = 0;
|
||||||
SDiskDataHdr hdr = {0};
|
SDiskDataHdr hdr = {0};
|
||||||
|
|
Loading…
Reference in New Issue