Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_refact
This commit is contained in:
commit
9a123ba608
|
@ -126,6 +126,7 @@ int32_t tBlockCmprFn(const void *p1, const void *p2);
|
||||||
void tBlockIdxReset(SBlockIdx *pBlockIdx);
|
void tBlockIdxReset(SBlockIdx *pBlockIdx);
|
||||||
int32_t tPutBlockIdx(uint8_t *p, void *ph);
|
int32_t tPutBlockIdx(uint8_t *p, void *ph);
|
||||||
int32_t tGetBlockIdx(uint8_t *p, void *ph);
|
int32_t tGetBlockIdx(uint8_t *p, void *ph);
|
||||||
|
int32_t tCmprBlockIdx(void const *lhs, void const *rhs);
|
||||||
// SColdata
|
// SColdata
|
||||||
#define tColDataInit() ((SColData){0})
|
#define tColDataInit() ((SColData){0})
|
||||||
void tColDataReset(SColData *pColData, int16_t cid, int8_t type);
|
void tColDataReset(SColData *pColData, int16_t cid, int8_t type);
|
||||||
|
@ -141,9 +142,9 @@ void tBlockDataReset(SBlockData *pBlockData);
|
||||||
void tBlockDataClear(SBlockData *pBlockData);
|
void tBlockDataClear(SBlockData *pBlockData);
|
||||||
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
|
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
// SDelIdx
|
// SDelIdx
|
||||||
int32_t tCmprDelIdx(void const *lhs, void const *rhs);
|
|
||||||
int32_t tPutDelIdx(uint8_t *p, void *ph);
|
int32_t tPutDelIdx(uint8_t *p, void *ph);
|
||||||
int32_t tGetDelIdx(uint8_t *p, void *ph);
|
int32_t tGetDelIdx(uint8_t *p, void *ph);
|
||||||
|
int32_t tCmprDelIdx(void const *lhs, void const *rhs);
|
||||||
// SDelData
|
// SDelData
|
||||||
int32_t tPutDelData(uint8_t *p, void *ph);
|
int32_t tPutDelData(uint8_t *p, void *ph);
|
||||||
int32_t tGetDelData(uint8_t *p, void *ph);
|
int32_t tGetDelData(uint8_t *p, void *ph);
|
||||||
|
|
|
@ -102,8 +102,8 @@ static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
|
||||||
return suid;
|
return suid;
|
||||||
}
|
}
|
||||||
/*
|
/*
|
||||||
static int32_t getMemLastRow(SMemTable *mem, tb_uid_t suid, tb_uid_t uid, STSRow **ppRow) {
|
static int32_t getMemLastRow(SMemTable *mem, tb_uid_t suid, tb_uid_t uid, STSRow
|
||||||
int32_t code = 0;
|
**ppRow) { int32_t code = 0;
|
||||||
|
|
||||||
if (mem) {
|
if (mem) {
|
||||||
STbData *pMem = NULL;
|
STbData *pMem = NULL;
|
||||||
|
@ -218,20 +218,28 @@ static int32_t getTableDelIdx(SDelFReader *pDelFReader, tb_uid_t suid, tb_uid_t
|
||||||
_err:
|
_err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
#if 0
|
||||||
static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFileSet *pFileSet, SArray *pSkyline,
|
static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFileSet *pFileSet, SArray *pSkyline,
|
||||||
STsdb *pTsdb, STSRow **pLastRow) {
|
STsdb *pTsdb, STSRow **ppLastRow) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
TSDBROW *pMemRow = NULL;
|
TSDBROW *pMemRow = NULL;
|
||||||
TSDBROW *pIMemRow = NULL;
|
TSDBROW *pIMemRow = NULL;
|
||||||
|
TSDBKEY memKey = TSDBKEY_MIN;
|
||||||
|
TSDBKEY imemKey = TSDBKEY_MIN;
|
||||||
|
|
||||||
if (iter != NULL) {
|
if (iter != NULL) {
|
||||||
pMemRow = tsdbTbDataIterGet(iter);
|
pMemRow = tsdbTbDataIterGet(iter);
|
||||||
|
if (pMemRow) {
|
||||||
|
memKey = tsdbRowKey(pMemRow);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (iter != NULL) {
|
if (iter != NULL) {
|
||||||
pIMemRow = tsdbTbDataIterGet(iiter);
|
pIMemRow = tsdbTbDataIterGet(iiter);
|
||||||
|
if (pIMemRow) {
|
||||||
|
imemKey = tsdbRowKey(pIMemRow);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataFReader *pDataFReader;
|
SDataFReader *pDataFReader;
|
||||||
|
@ -243,42 +251,266 @@ static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFile
|
||||||
code = tsdbReadBlockIdx(pDataFReader, &blockIdxMap, NULL);
|
code = tsdbReadBlockIdx(pDataFReader, &blockIdxMap, NULL);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
SBlockData *pBlockData;
|
SBlockIdx blockIdx = {0};
|
||||||
|
tBlockIdxReset(&blockIdx);
|
||||||
|
code = tMapDataSearch(&blockIdxMap, pBlockIdx, tGetBlockIdx, tCmprBlockIdx, &blockIdx);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
SMapData blockMap = {0};
|
||||||
|
tMapDataReset(&blockMap);
|
||||||
|
code = tsdbReadBlock(pDataFReader, &blockIdx, &blockMap, NULL);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
int32_t nBlock = blockMap.nItem;
|
||||||
|
for (int32_t iBlock = nBlock - 1; iBlock >= 0; --iBlock) {
|
||||||
|
SBlock block = {0};
|
||||||
|
SBlockData blockData = {0};
|
||||||
|
|
||||||
|
tBlockReset(&block);
|
||||||
|
tBlockDataReset(&blockData);
|
||||||
|
|
||||||
|
tMapDataGetItemByIdx(&blockMap, iBlock, &block, tGetBlock);
|
||||||
|
|
||||||
|
code = tsdbReadBlockData(pDataFReader, &blockIdx, &block, &blockData, NULL, 0, NULL, NULL);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
int32_t nRow = blockData.nRow;
|
||||||
|
for (int32_t iRow = nRow - 1; iRow >= 0; --iRow) {
|
||||||
|
TSDBROW row = tsdbRowFromBlockData(&blockData, iRow);
|
||||||
|
|
||||||
|
TSDBKEY key = tsdbRowKey(&row);
|
||||||
|
if (pMemRow != NULL && pIMemRow != NULL) {
|
||||||
|
int32_t c = tsdbKeyCmprFn(memKey, imemKey);
|
||||||
|
if (c < 0) {
|
||||||
|
} else if (c > 0) {
|
||||||
|
} else {
|
||||||
|
}
|
||||||
|
} else if (pMemRow != NULL) {
|
||||||
|
pMemRow = tsdbTbDataIterGet(iter);
|
||||||
|
|
||||||
|
} else if (pIMemRow != NULL) {
|
||||||
|
} else {
|
||||||
|
if (!tsdbKeyDeleted(key, pSkyline)) {
|
||||||
|
*ppLastRow = buildTsrowFromTsdbrow(&row);
|
||||||
|
goto _done;
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// select current row if outside delete area
|
||||||
|
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_done:
|
||||||
tsdbDataFReaderClose(&pDataFReader);
|
tsdbDataFReaderClose(&pDataFReader);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
typedef enum SFSNEXTROWSTATES {
|
||||||
|
SFSNEXTROW_FS,
|
||||||
|
SFSNEXTROW_FILESET,
|
||||||
|
SFSNEXTROW_BLOCKDATA,
|
||||||
|
SFSNEXTROW_BLOCKROW
|
||||||
|
} SFSNEXTROWSTATES;
|
||||||
|
|
||||||
|
typedef struct SFSNextRowIter {
|
||||||
|
SFSNEXTROWSTATES state; // [input]
|
||||||
|
STsdb *pTsdb; // [input]
|
||||||
|
SBlockIdx *pBlockIdxExp; // [input]
|
||||||
|
int32_t nFileSet;
|
||||||
|
int32_t iFileSet;
|
||||||
|
SArray *aDFileSet;
|
||||||
|
SDataFReader *pDataFReader;
|
||||||
|
SMapData blockIdxMap;
|
||||||
|
SBlockIdx blockIdx;
|
||||||
|
SMapData blockMap;
|
||||||
|
int32_t nBlock;
|
||||||
|
int32_t iBlock;
|
||||||
|
SBlock block;
|
||||||
|
SBlockData blockData;
|
||||||
|
int32_t nRow;
|
||||||
|
int32_t iRow;
|
||||||
|
TSDBROW row;
|
||||||
|
} SFSNextRowIter;
|
||||||
|
|
||||||
|
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
||||||
|
SFSNextRowIter *state = (SFSNextRowIter *)iter;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
switch (state->state) {
|
||||||
|
case SFSNEXTROW_FS:
|
||||||
|
state->aDFileSet = state->pTsdb->fs->cState->aDFileSet;
|
||||||
|
state->nFileSet = taosArrayGetSize(state->aDFileSet);
|
||||||
|
state->iFileSet = state->nFileSet;
|
||||||
|
|
||||||
|
case SFSNEXTROW_FILESET: {
|
||||||
|
SDFileSet *pFileSet = NULL;
|
||||||
|
if (--state->iFileSet >= 0) {
|
||||||
|
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
|
||||||
|
} else {
|
||||||
|
*ppRow = NULL;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
tMapDataReset(&state->blockIdxMap);
|
||||||
|
code = tsdbReadBlockIdx(state->pDataFReader, &state->blockIdxMap, NULL);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
tBlockIdxReset(&state->blockIdx);
|
||||||
|
code = tMapDataSearch(&state->blockIdxMap, state->pBlockIdxExp, tGetBlockIdx, tCmprBlockIdx, &state->blockIdx);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
tMapDataReset(&state->blockMap);
|
||||||
|
code = tsdbReadBlock(state->pDataFReader, &state->blockIdx, &state->blockMap, NULL);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
state->nBlock = state->blockMap.nItem;
|
||||||
|
state->iBlock = state->nBlock - 1;
|
||||||
|
}
|
||||||
|
case SFSNEXTROW_BLOCKDATA:
|
||||||
|
if (state->iBlock >= 0) {
|
||||||
|
SBlock block = {0};
|
||||||
|
|
||||||
|
tBlockReset(&block);
|
||||||
|
tBlockDataReset(&state->blockData);
|
||||||
|
|
||||||
|
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetBlock);
|
||||||
|
code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, 0, NULL, NULL);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
state->nRow = state->blockData.nRow;
|
||||||
|
state->iRow = state->nRow - 1;
|
||||||
|
|
||||||
|
state->state = SFSNEXTROW_BLOCKROW;
|
||||||
|
}
|
||||||
|
case SFSNEXTROW_BLOCKROW:
|
||||||
|
if (state->iRow >= 0) {
|
||||||
|
state->row = tsdbRowFromBlockData(&state->blockData, state->iRow);
|
||||||
|
*ppRow = &state->row;
|
||||||
|
|
||||||
|
if (--state->iRow < 0) {
|
||||||
|
state->state = SFSNEXTROW_BLOCKDATA;
|
||||||
|
if (--state->iBlock < 0) {
|
||||||
|
tsdbDataFReaderClose(&state->pDataFReader);
|
||||||
|
|
||||||
|
state->state = SFSNEXTROW_FILESET;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
default:
|
||||||
|
ASSERT(0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
_err:
|
||||||
|
*ppRow = NULL;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef enum SMEMNEXTROWSTATES {
|
||||||
|
SMEMNEXTROW_ENTER,
|
||||||
|
SMEMNEXTROW_NEXT,
|
||||||
|
} SMEMNEXTROWSTATES;
|
||||||
|
|
||||||
|
typedef struct SMemNextRowIter {
|
||||||
|
SMEMNEXTROWSTATES state;
|
||||||
|
STbData *pMem; // [input]
|
||||||
|
STbDataIter iter; // mem buffer skip list iterator
|
||||||
|
} SMemNextRowIter;
|
||||||
|
|
||||||
|
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow) {
|
||||||
|
SMemNextRowIter *state = (SMemNextRowIter *)iter;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
switch (state->state) {
|
||||||
|
case SMEMNEXTROW_ENTER: {
|
||||||
|
if (state->pMem != NULL) {
|
||||||
|
tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
|
||||||
|
|
||||||
|
TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
|
||||||
|
if (pMemRow) {
|
||||||
|
*ppRow = pMemRow;
|
||||||
|
state->state = SMEMNEXTROW_NEXT;
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppRow = NULL;
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
case SMEMNEXTROW_NEXT:
|
||||||
|
if (tsdbTbDataIterNext(&state->iter)) {
|
||||||
|
*ppRow = tsdbTbDataIterGet(&state->iter);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
} else {
|
||||||
|
*ppRow = NULL;
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
ASSERT(0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
_err:
|
||||||
|
*ppRow = NULL;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static STSRow *tsRowFromTsdbRow(TSDBROW *pRow) {
|
||||||
|
// TODO: new tsrow from tsdbrow
|
||||||
|
STSRow *ret = NULL;
|
||||||
|
if (pRow->type == 0) {
|
||||||
|
return pRow->pTSRow;
|
||||||
|
} else {
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow);
|
||||||
|
|
||||||
|
typedef struct TsdbNextRowState {
|
||||||
|
TSDBROW *pRow;
|
||||||
|
bool stop;
|
||||||
|
bool next;
|
||||||
|
void *iter;
|
||||||
|
_next_row_fn_t nextRowFn;
|
||||||
|
} TsdbNextRowState;
|
||||||
|
|
||||||
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
|
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
|
||||||
|
|
||||||
STbData *pMem = NULL;
|
STbData *pMem = NULL;
|
||||||
STbData *pIMem = NULL;
|
|
||||||
STbDataIter iter; // mem buffer skip list iterator
|
|
||||||
STbDataIter iiter; // imem buffer skip list iterator
|
|
||||||
|
|
||||||
if (pTsdb->mem) {
|
if (pTsdb->mem) {
|
||||||
tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem);
|
tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem);
|
||||||
if (pMem != NULL) {
|
|
||||||
tsdbTbDataIterOpen(pMem, NULL, 1, &iter);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STbData *pIMem = NULL;
|
||||||
if (pTsdb->imem) {
|
if (pTsdb->imem) {
|
||||||
tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem);
|
tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem);
|
||||||
if (pIMem != NULL) {
|
|
||||||
tsdbTbDataIterOpen(pIMem, NULL, 1, &iiter);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
*ppRow = NULL;
|
*ppRow = NULL;
|
||||||
|
|
||||||
SDelFReader *pDelFReader;
|
SDelFReader *pDelFReader;
|
||||||
// code = tsdbDelFReaderOpen(&pDelFReader, pTsdb->fs->cState->pDelFile, pTsdb, NULL);
|
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
|
||||||
|
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
SDelIdx delIdx;
|
SDelIdx delIdx;
|
||||||
|
@ -288,30 +520,112 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
||||||
SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
|
SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
|
||||||
code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline);
|
code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
/*
|
|
||||||
SFSIter fsiter;
|
|
||||||
bool fsHasNext = false;
|
|
||||||
|
|
||||||
tsdbFSIterOpen(pTsdb->fs, TSDB_FS_ITER_BACKWARD, &fsiter);
|
int iSkyline = taosArrayGetSize(pSkyline) - 1;
|
||||||
do {
|
|
||||||
*/
|
|
||||||
SDFileSet *pFileSet = NULL;
|
|
||||||
// pFileSet = tsdbFSIterGet(fsiter);
|
|
||||||
|
|
||||||
code = mergeLastRowFileSet(&iter, &iiter, pFileSet, pSkyline, pTsdb, ppRow);
|
|
||||||
if (code < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (*ppRow != NULL) {
|
|
||||||
// break;
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
} while (fsHasNext = tsdbFSIterNext(fsiter))
|
|
||||||
*/
|
|
||||||
|
|
||||||
tsdbDelFReaderClose(pDelFReader);
|
tsdbDelFReaderClose(pDelFReader);
|
||||||
|
|
||||||
|
SBlockIdx idx = {.suid = suid, .uid = uid};
|
||||||
|
|
||||||
|
SFSNextRowIter fsState = {0};
|
||||||
|
fsState.state = SFSNEXTROW_FS;
|
||||||
|
fsState.pTsdb = pTsdb;
|
||||||
|
fsState.pBlockIdxExp = &idx;
|
||||||
|
|
||||||
|
SMemNextRowIter memState = {0};
|
||||||
|
SMemNextRowIter imemState = {0};
|
||||||
|
TSDBROW memRow, imemRow, fsRow;
|
||||||
|
|
||||||
|
TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem},
|
||||||
|
{&imemRow, true, false, &imemState, getNextRowFromMem},
|
||||||
|
{&fsRow, false, true, &fsState, getNextRowFromFS}};
|
||||||
|
|
||||||
|
if (pMem) {
|
||||||
|
memState.pMem = pMem;
|
||||||
|
memState.state = SMEMNEXTROW_ENTER;
|
||||||
|
input[0].stop = false;
|
||||||
|
input[0].next = true;
|
||||||
|
}
|
||||||
|
if (pIMem) {
|
||||||
|
imemState.pMem = pIMem;
|
||||||
|
imemState.state = SMEMNEXTROW_ENTER;
|
||||||
|
input[1].stop = false;
|
||||||
|
input[1].next = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
do {
|
||||||
|
for (int i = 0; i < 3; ++i) {
|
||||||
|
if (input[i].next && !input[i].stop) {
|
||||||
|
code = input[i].nextRowFn(input[i].iter, &input[i].pRow);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
if (input[i].pRow == NULL) {
|
||||||
|
input[i].stop = true;
|
||||||
|
input[i].next = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (input[0].stop && input[1].stop && input[2].stop) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// select maxpoint(s) from mem, imem, fs
|
||||||
|
TSDBROW *max[3] = {0};
|
||||||
|
int iMax[3] = {-1, -1, -1};
|
||||||
|
int nMax = 0;
|
||||||
|
for (int i = 0; i < 3; ++i) {
|
||||||
|
if (input[i].pRow != NULL) {
|
||||||
|
TSDBKEY key = TSDBROW_KEY(input[i].pRow);
|
||||||
|
TSDBKEY maxKey = TSDBROW_KEY(max[nMax]);
|
||||||
|
|
||||||
|
// merging & deduplicating on client side
|
||||||
|
if (maxKey.ts <= key.ts) {
|
||||||
|
if (maxKey.ts < key.ts) {
|
||||||
|
nMax = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
iMax[nMax] = i;
|
||||||
|
max[nMax++] = input[i].pRow;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete detection
|
||||||
|
TSDBROW *merge[3] = {0};
|
||||||
|
int nMerge = 0;
|
||||||
|
for (int i = 0; i < nMax; ++i) {
|
||||||
|
TSDBKEY maxKey = TSDBROW_KEY(max[i]);
|
||||||
|
|
||||||
|
bool deleted = false;
|
||||||
|
// bool deleted = tsdbKeyDeleted(maxKey, pSkyline, &iSkyline);
|
||||||
|
if (!deleted) {
|
||||||
|
merge[nMerge++] = max[i];
|
||||||
|
} else {
|
||||||
|
input[iMax[i]].next = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// merge if nMerge > 1
|
||||||
|
if (nMerge > 0) {
|
||||||
|
if (nMerge == 1) {
|
||||||
|
*ppRow = tsRowFromTsdbRow(merge[nMerge]);
|
||||||
|
} else {
|
||||||
|
// merge 2 or 3 rows
|
||||||
|
SRowMerger merger = {0};
|
||||||
|
|
||||||
|
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
|
||||||
|
|
||||||
|
tRowMergerInit(&merger, merge[0], pTSchema);
|
||||||
|
for (int i = 1; i < nMerge; ++i) {
|
||||||
|
tRowMerge(&merger, merge[i]);
|
||||||
|
}
|
||||||
|
tRowMergerGetRow(&merger, ppRow);
|
||||||
|
tRowMergerClear(&merger);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (*ppRow == NULL);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
@ -349,7 +663,8 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) {
|
||||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||||
if (h) {
|
if (h) {
|
||||||
taosLRUCacheRelease(pCache, h, true);
|
taosLRUCacheRelease(pCache, h, true);
|
||||||
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
|
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t
|
||||||
|
// keyLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -627,4 +627,4 @@ SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState) { return pState->pDelFile;
|
||||||
|
|
||||||
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid) {
|
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid) {
|
||||||
return (SDFileSet *)taosArraySearch(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);
|
return (SDFileSet *)taosArraySearch(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);
|
||||||
}
|
}
|
||||||
|
|
|
@ -323,6 +323,25 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) {
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tCmprBlockIdx(void const *lhs, void const *rhs) {
|
||||||
|
SBlockIdx *lBlockIdx = *(SBlockIdx **)lhs;
|
||||||
|
SBlockIdx *rBlockIdx = *(SBlockIdx **)rhs;
|
||||||
|
|
||||||
|
if (lBlockIdx->suid < lBlockIdx->suid) {
|
||||||
|
return -1;
|
||||||
|
} else if (lBlockIdx->suid > lBlockIdx->suid) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (lBlockIdx->uid < lBlockIdx->uid) {
|
||||||
|
return -1;
|
||||||
|
} else if (lBlockIdx->uid > lBlockIdx->uid) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// SBlock ======================================================
|
// SBlock ======================================================
|
||||||
void tBlockReset(SBlock *pBlock) {
|
void tBlockReset(SBlock *pBlock) {
|
||||||
pBlock->minKey = TSDBKEY_MAX;
|
pBlock->minKey = TSDBKEY_MAX;
|
||||||
|
|
Loading…
Reference in New Issue