tsdb/cache reader: remove dataf & dataf last readers
This commit is contained in:
parent
998fbe574d
commit
9f9efe9a5e
|
@ -871,8 +871,6 @@ typedef struct SCacheRowsReader {
|
||||||
int32_t numOfTables;
|
int32_t numOfTables;
|
||||||
SArray *pLDataIterArray;
|
SArray *pLDataIterArray;
|
||||||
STsdbReadSnap *pReadSnap;
|
STsdbReadSnap *pReadSnap;
|
||||||
SDataFReader *pDataFReader;
|
|
||||||
SDataFReader *pDataFReaderLast;
|
|
||||||
char *idstr;
|
char *idstr;
|
||||||
int64_t lastTs;
|
int64_t lastTs;
|
||||||
} SCacheRowsReader;
|
} SCacheRowsReader;
|
||||||
|
|
|
@ -1691,37 +1691,27 @@ _err:
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
static int32_t getTableDelIdx(SDelFReader *pDelFReader, tb_uid_t suid, tb_uid_t uid, SDelIdx *pDelIdx) {
|
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
|
||||||
|
bool isFile) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SArray *pDelIdxArray = NULL;
|
|
||||||
|
|
||||||
// SMapData delIdxMap;
|
return TSDB_CODE_SUCCESS;
|
||||||
pDelIdxArray = taosArrayInit(32, sizeof(SDelIdx));
|
|
||||||
SDelIdx idx = {.suid = suid, .uid = uid};
|
|
||||||
|
|
||||||
// tMapDataReset(&delIdxMap);
|
|
||||||
code = tsdbReadDelIdx(pDelFReader, pDelIdxArray);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
// code = tMapDataSearch(&delIdxMap, &idx, tGetDelIdx, tCmprDelIdx, pDelIdx);
|
|
||||||
SDelIdx *pIdx = taosArraySearch(pDelIdxArray, &idx, tCmprDelIdx, TD_EQ);
|
|
||||||
|
|
||||||
*pDelIdx = *pIdx;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
if (pDelIdxArray) {
|
|
||||||
taosArrayDestroy(pDelIdxArray);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
typedef struct {
|
|
||||||
SMergeTree mergeTree;
|
|
||||||
SMergeTree *pMergeTree;
|
|
||||||
} SFSLastIter;
|
|
||||||
|
|
||||||
static int32_t loadSttTombData(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) {
|
static int32_t loadDataTomb(SCacheRowsReader *pReader, SDataFileReader *pFileReader) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
const TTombBlkArray *pBlkArray = NULL;
|
||||||
|
code = tsdbDataFileReadTombBlk(pFileReader, &pBlkArray);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
return loadTombFromBlk(pBlkArray, pReader, pFileReader, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
SCacheRowsReader *pReader = (SCacheRowsReader *)pTsdbReader;
|
SCacheRowsReader *pReader = (SCacheRowsReader *)pTsdbReader;
|
||||||
|
@ -1736,12 +1726,14 @@ static int32_t loadSttTombData(STsdbReader *pTsdbReader, SSttFileReader *pSttFil
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
return loadTombFromBlk(pBlkArray, pReader, pSttFileReader, false);
|
||||||
return doLoadTombDataFromTombBlk(pBlkArray, pReader, pSttFileReader, false);
|
|
||||||
*/
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SMergeTree mergeTree;
|
||||||
|
SMergeTree *pMergeTree;
|
||||||
|
} SFSLastIter;
|
||||||
|
|
||||||
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
|
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
|
||||||
tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
|
tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -1764,7 +1756,7 @@ static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb
|
||||||
.pSttFileBlockIterArray = pr->pLDataIterArray,
|
.pSttFileBlockIterArray = pr->pLDataIterArray,
|
||||||
.pCols = aCols,
|
.pCols = aCols,
|
||||||
.numOfCols = nCols,
|
.numOfCols = nCols,
|
||||||
.loadTombFn = loadSttTombData,
|
.loadTombFn = loadSttTomb,
|
||||||
.pReader = pr,
|
.pReader = pr,
|
||||||
.idstr = pr->idstr,
|
.idstr = pr->idstr,
|
||||||
};
|
};
|
||||||
|
@ -2545,8 +2537,8 @@ typedef struct CacheNextRowIter {
|
||||||
} CacheNextRowIter;
|
} CacheNextRowIter;
|
||||||
|
|
||||||
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
|
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
|
||||||
SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader,
|
SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, int64_t lastTs,
|
||||||
SDataFReader **pDataFReaderLast, int64_t lastTs, SCacheRowsReader *pr) {
|
SCacheRowsReader *pr) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
|
||||||
STbData *pMem = NULL;
|
STbData *pMem = NULL;
|
||||||
|
@ -2630,7 +2622,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
||||||
pIter->input[2] = (TsdbNextRowState){
|
pIter->input[2] = (TsdbNextRowState){
|
||||||
&pIter->fsLastRow, false, true, false, &pIter->fsLastState, getNextRowFromFSLast, clearNextRowFromFSLast};
|
&pIter->fsLastRow, false, true, false, &pIter->fsLastState, getNextRowFromFSLast, clearNextRowFromFSLast};
|
||||||
*/
|
*/
|
||||||
pIter->input[3] =
|
pIter->input[2] =
|
||||||
(TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
|
(TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
|
||||||
|
|
||||||
if (pMem) {
|
if (pMem) {
|
||||||
|
@ -2680,7 +2672,7 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI
|
||||||
int16_t *aCols, int nCols) {
|
int16_t *aCols, int nCols) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
for (int i = 0; i < 4; ++i) {
|
for (int i = 0; i < 3; ++i) {
|
||||||
if (pIter->input[i].next && !pIter->input[i].stop) {
|
if (pIter->input[i].next && !pIter->input[i].stop) {
|
||||||
code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, &pIter->input[i].ignoreEarlierTs,
|
code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, &pIter->input[i].ignoreEarlierTs,
|
||||||
isLast, aCols, nCols);
|
isLast, aCols, nCols);
|
||||||
|
@ -2835,8 +2827,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
||||||
TSKEY lastRowTs = TSKEY_MAX;
|
TSKEY lastRowTs = TSKEY_MAX;
|
||||||
|
|
||||||
CacheNextRowIter iter = {0};
|
CacheNextRowIter iter = {0};
|
||||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, &pr->pDataFReader,
|
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
|
||||||
&pr->pDataFReaderLast, pr->lastTs, pr);
|
|
||||||
|
|
||||||
do {
|
do {
|
||||||
TSDBROW *pRow = NULL;
|
TSDBROW *pRow = NULL;
|
||||||
|
@ -3005,8 +2996,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
|
||||||
TSKEY lastRowTs = TSKEY_MAX;
|
TSKEY lastRowTs = TSKEY_MAX;
|
||||||
|
|
||||||
CacheNextRowIter iter = {0};
|
CacheNextRowIter iter = {0};
|
||||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, &pr->pDataFReader,
|
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
|
||||||
&pr->pDataFReaderLast, pr->lastTs, pr);
|
|
||||||
|
|
||||||
do {
|
do {
|
||||||
TSDBROW *pRow = NULL;
|
TSDBROW *pRow = NULL;
|
||||||
|
|
|
@ -300,8 +300,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
pr->pDataFReader = NULL;
|
|
||||||
pr->pDataFReaderLast = NULL;
|
|
||||||
|
|
||||||
int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3;
|
int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3;
|
||||||
|
|
||||||
|
@ -422,15 +420,13 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
}
|
}
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
tsdbDataFReaderClose(&pr->pDataFReaderLast);
|
tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true);
|
||||||
tsdbDataFReaderClose(&pr->pDataFReader);
|
|
||||||
|
|
||||||
int64_t loadBlocks = 0;
|
int64_t loadBlocks = 0;
|
||||||
double elapse = 0;
|
double elapse = 0;
|
||||||
pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, &loadBlocks, &elapse);
|
pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, &loadBlocks, &elapse);
|
||||||
pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
|
||||||
tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true);
|
|
||||||
taosThreadMutexUnlock(&pr->readerMutex);
|
taosThreadMutexUnlock(&pr->readerMutex);
|
||||||
|
|
||||||
if (pRes != NULL) {
|
if (pRes != NULL) {
|
||||||
|
|
Loading…
Reference in New Issue