tsdbCache/skyline: first round skyline detection
This commit is contained in:
parent
ce15ee0e41
commit
03d5b62d52
|
@ -18,7 +18,8 @@
|
||||||
int32_t tsdbOpenCache(STsdb *pTsdb) {
|
int32_t tsdbOpenCache(STsdb *pTsdb) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SLRUCache *pCache = NULL;
|
SLRUCache *pCache = NULL;
|
||||||
size_t cfgCapacity = 1024 * 1024; // TODO: get cfg from tsdb config
|
// TODO: get cfg from vnode config: pTsdb->pVnode->config.lruCapacity
|
||||||
|
size_t cfgCapacity = 1024 * 1024;
|
||||||
|
|
||||||
pCache = taosLRUCacheInit(cfgCapacity, -1, .5);
|
pCache = taosLRUCacheInit(cfgCapacity, -1, .5);
|
||||||
if (pCache == NULL) {
|
if (pCache == NULL) {
|
||||||
|
@ -61,8 +62,11 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
|
||||||
if (h) {
|
if (h) {
|
||||||
cacheRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
cacheRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||||
if (row->ts >= cacheRow->ts) {
|
if (row->ts >= cacheRow->ts) {
|
||||||
if (row->ts > cacheRow->ts) {
|
if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) {
|
||||||
tdRowCpy(cacheRow, row);
|
tdRowCpy(cacheRow, row);
|
||||||
|
} else {
|
||||||
|
tsdbCacheDeleteLastrow(pCache, uid);
|
||||||
|
tsdbCacheInsertLastrow(pCache, uid, row);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -475,13 +479,39 @@ static STSRow *tsRowFromTsdbRow(TSDBROW *pRow) {
|
||||||
// TODO: new tsrow from tsdbrow
|
// TODO: new tsrow from tsdbrow
|
||||||
STSRow *ret = NULL;
|
STSRow *ret = NULL;
|
||||||
if (pRow->type == 0) {
|
if (pRow->type == 0) {
|
||||||
return pRow->pTSRow;
|
return tdRowDup(pRow->pTSRow);
|
||||||
} else {
|
} else {
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int *iSkyline) {
|
||||||
|
bool deleted = false;
|
||||||
|
while (*iSkyline > 0) {
|
||||||
|
TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
|
||||||
|
TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);
|
||||||
|
|
||||||
|
if (key->ts > pItemBack->ts) {
|
||||||
|
return false;
|
||||||
|
} else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
|
||||||
|
if ((key->version <= pItemFront->version || key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (*iSkyline > 1) {
|
||||||
|
--*iSkyline;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return deleted;
|
||||||
|
}
|
||||||
|
|
||||||
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow);
|
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow);
|
||||||
|
|
||||||
typedef struct TsdbNextRowState {
|
typedef struct TsdbNextRowState {
|
||||||
|
@ -598,8 +628,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
||||||
for (int i = 0; i < nMax; ++i) {
|
for (int i = 0; i < nMax; ++i) {
|
||||||
TSDBKEY maxKey = TSDBROW_KEY(max[i]);
|
TSDBKEY maxKey = TSDBROW_KEY(max[i]);
|
||||||
|
|
||||||
bool deleted = false;
|
// bool deleted = false;
|
||||||
// bool deleted = tsdbKeyDeleted(maxKey, pSkyline, &iSkyline);
|
bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
|
||||||
if (!deleted) {
|
if (!deleted) {
|
||||||
merge[nMerge++] = max[i];
|
merge[nMerge++] = max[i];
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue