tsdbCache: new row iterator for mem/imem/fs
This commit is contained in:
parent
644963b745
commit
22eb20bb67
|
@ -238,7 +238,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf);
|
|||
// tsdbCache
|
||||
int32_t tsdbOpenCache(STsdb *pTsdb);
|
||||
void tsdbCloseCache(SLRUCache *pCache);
|
||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
|
||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb);
|
||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup);
|
||||
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
|
||||
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
|
||||
|
|
|
@ -173,20 +173,64 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
|
||||
typedef struct {
|
||||
TSKEY ts;
|
||||
SColVal colVal;
|
||||
} SLastCol;
|
||||
|
||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
STSRow *cacheRow = NULL;
|
||||
char key[32] = {0};
|
||||
int keyLen = 0;
|
||||
|
||||
((void)(row));
|
||||
// ((void)(row));
|
||||
|
||||
// getTableCacheKey(uid, "l", key, &keyLen);
|
||||
getTableCacheKey(uid, 1, key, &keyLen);
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (h) {
|
||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
|
||||
TSKEY keyTs = row->ts;
|
||||
bool invalidate = false;
|
||||
|
||||
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
|
||||
int16_t nCol = taosArrayGetSize(pLast);
|
||||
int16_t iCol = 0;
|
||||
|
||||
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
|
||||
if (keyTs > tTsVal->ts) {
|
||||
STColumn *pTColumn = &pTSchema->columns[0];
|
||||
SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = keyTs});
|
||||
|
||||
taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = tColVal});
|
||||
}
|
||||
|
||||
for (++iCol; iCol < nCol; ++iCol) {
|
||||
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
|
||||
if (keyTs >= tTsVal->ts) {
|
||||
SColVal *tColVal = &tTsVal->colVal;
|
||||
|
||||
SColVal colVal = {0};
|
||||
tTSRowGetVal(row, pTSchema, iCol, &colVal);
|
||||
if (colVal.isNone || colVal.isNull) {
|
||||
if (keyTs == tTsVal->ts && !tColVal->isNone && !tColVal->isNull) {
|
||||
invalidate = true;
|
||||
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = colVal});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
|
||||
taosLRUCacheRelease(pCache, h, invalidate);
|
||||
|
||||
// clear last cache anyway, lazy load when get last lookup
|
||||
taosLRUCacheRelease(pCache, h, true);
|
||||
// taosLRUCacheRelease(pCache, h, true);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -516,12 +560,46 @@ typedef struct SMemNextRowIter {
|
|||
SMEMNEXTROWSTATES state;
|
||||
STbData *pMem; // [input]
|
||||
STbDataIter iter; // mem buffer skip list iterator
|
||||
// bool iterOpened;
|
||||
// TSDBROW *curRow;
|
||||
} SMemNextRowIter;
|
||||
|
||||
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow) {
|
||||
// static int32_t getNextRowFromMem(void *iter, SArray *pRowArray) {
|
||||
SMemNextRowIter *state = (SMemNextRowIter *)iter;
|
||||
int32_t code = 0;
|
||||
/*
|
||||
if (!state->iterOpened) {
|
||||
if (state->pMem != NULL) {
|
||||
tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
|
||||
|
||||
state->iterOpened = true;
|
||||
|
||||
TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
|
||||
if (pMemRow) {
|
||||
state->curRow = pMemRow;
|
||||
} else {
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayPush(pRowArray, state->curRow);
|
||||
while (tsdbTbDataIterNext(&state->iter)) {
|
||||
TSDBROW *row = tsdbTbDataIterGet(&state->iter);
|
||||
|
||||
if (TSDBROW_TS(row) < TSDBROW_TS(state->curRow)) {
|
||||
state->curRow = row;
|
||||
break;
|
||||
} else {
|
||||
taosArrayPush(pRowArray, row);
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
*/
|
||||
switch (state->state) {
|
||||
case SMEMNEXTROW_ENTER: {
|
||||
if (state->pMem != NULL) {
|
||||
|
@ -599,7 +677,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int *iSkyline) {
|
||||
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
|
||||
bool deleted = false;
|
||||
while (*iSkyline > 0) {
|
||||
TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
|
||||
|
@ -626,9 +704,11 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int *iSkyline) {
|
|||
}
|
||||
|
||||
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow);
|
||||
// typedef int32_t (*_next_row_fn_t)(void *iter, SArray *pRowArray);
|
||||
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
|
||||
|
||||
typedef struct TsdbNextRowState {
|
||||
// typedef struct TsdbNextRowState {
|
||||
typedef struct {
|
||||
TSDBROW *pRow;
|
||||
bool stop;
|
||||
bool next;
|
||||
|
@ -637,6 +717,388 @@ typedef struct TsdbNextRowState {
|
|||
_next_row_clear_fn_t nextRowClearFn;
|
||||
} TsdbNextRowState;
|
||||
|
||||
typedef struct {
|
||||
// STsdb *pTsdb;
|
||||
SArray *pSkyline;
|
||||
int64_t iSkyline;
|
||||
|
||||
SBlockIdx idx;
|
||||
SMemNextRowIter memState;
|
||||
SMemNextRowIter imemState;
|
||||
SFSNextRowIter fsState;
|
||||
TSDBROW memRow, imemRow, fsRow;
|
||||
|
||||
TsdbNextRowState input[3];
|
||||
} CacheNextRowIter;
|
||||
|
||||
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb) {
|
||||
int code = 0;
|
||||
|
||||
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
|
||||
|
||||
STbData *pMem = NULL;
|
||||
if (pTsdb->mem) {
|
||||
tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem);
|
||||
}
|
||||
|
||||
STbData *pIMem = NULL;
|
||||
if (pTsdb->imem) {
|
||||
tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem);
|
||||
}
|
||||
|
||||
pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
|
||||
|
||||
SDelIdx delIdx;
|
||||
|
||||
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
|
||||
if (pDelFile) {
|
||||
SDelFReader *pDelFReader;
|
||||
|
||||
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
|
||||
if (code) goto _err;
|
||||
|
||||
code = getTableDelIdx(pDelFReader, suid, uid, &delIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pIter->pSkyline);
|
||||
if (code) goto _err;
|
||||
|
||||
tsdbDelFReaderClose(&pDelFReader);
|
||||
} else {
|
||||
code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pIter->pSkyline);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
|
||||
|
||||
pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
|
||||
|
||||
pIter->fsState.state = SFSNEXTROW_FS;
|
||||
pIter->fsState.pTsdb = pTsdb;
|
||||
pIter->fsState.pBlockIdxExp = &pIter->idx;
|
||||
|
||||
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL};
|
||||
pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL};
|
||||
pIter->input[2] =
|
||||
(TsdbNextRowState){&pIter->fsRow, false, true, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
|
||||
|
||||
if (pMem) {
|
||||
pIter->memState.pMem = pMem;
|
||||
pIter->memState.state = SMEMNEXTROW_ENTER;
|
||||
pIter->input[0].stop = false;
|
||||
pIter->input[0].next = true;
|
||||
}
|
||||
|
||||
if (pIMem) {
|
||||
pIter->imemState.pMem = pIMem;
|
||||
pIter->imemState.state = SMEMNEXTROW_ENTER;
|
||||
pIter->input[1].stop = false;
|
||||
pIter->input[1].next = true;
|
||||
}
|
||||
|
||||
return code;
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
|
||||
int code = 0;
|
||||
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
if (pIter->input[i].nextRowClearFn) {
|
||||
pIter->input[i].nextRowClearFn(pIter->input[i].iter);
|
||||
}
|
||||
}
|
||||
|
||||
if (pIter->pSkyline) {
|
||||
taosArrayDestroy(pIter->pSkyline);
|
||||
}
|
||||
|
||||
return code;
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
// iterate next row non deleted backward ts, version (from high to low)
|
||||
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) {
|
||||
int code = 0;
|
||||
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
if (pIter->input[i].next && !pIter->input[i].stop) {
|
||||
code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow);
|
||||
if (code) goto _err;
|
||||
|
||||
if (pIter->input[i].pRow == NULL) {
|
||||
pIter->input[i].stop = true;
|
||||
pIter->input[i].next = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) {
|
||||
*ppRow = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
// select maxpoint(s) from mem, imem, fs
|
||||
TSDBROW *max[3] = {0};
|
||||
int iMax[3] = {-1, -1, -1};
|
||||
int nMax = 0;
|
||||
TSKEY maxKey = TSKEY_MIN;
|
||||
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
|
||||
TSDBKEY key = TSDBROW_KEY(pIter->input[i].pRow);
|
||||
|
||||
// merging & deduplicating on client side
|
||||
if (maxKey <= key.ts) {
|
||||
if (maxKey < key.ts) {
|
||||
nMax = 0;
|
||||
maxKey = key.ts;
|
||||
}
|
||||
|
||||
iMax[nMax] = i;
|
||||
max[nMax++] = pIter->input[i].pRow;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// delete detection
|
||||
TSDBROW *merge[3] = {0};
|
||||
int iMerge[3] = {-1, -1, -1};
|
||||
int nMerge = 0;
|
||||
for (int i = 0; i < nMax; ++i) {
|
||||
TSDBKEY maxKey = TSDBROW_KEY(max[i]);
|
||||
|
||||
bool deleted = tsdbKeyDeleted(&maxKey, pIter->pSkyline, &pIter->iSkyline);
|
||||
if (!deleted) {
|
||||
iMerge[nMerge] = iMax[i];
|
||||
merge[nMerge++] = max[i];
|
||||
}
|
||||
|
||||
pIter->input[iMax[i]].next = deleted;
|
||||
}
|
||||
|
||||
if (nMerge > 0) {
|
||||
pIter->input[iMerge[0]].next = true;
|
||||
|
||||
*ppRow = merge[0];
|
||||
} else {
|
||||
*ppRow = NULL;
|
||||
}
|
||||
|
||||
return code;
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mergeLastRow2(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
|
||||
int16_t nCol = pTSchema->numOfCols;
|
||||
int16_t iCol = 0;
|
||||
int16_t noneCol = 0;
|
||||
bool setNoneCol = false;
|
||||
SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
|
||||
SColVal *pColVal = &(SColVal){0};
|
||||
|
||||
// tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
|
||||
|
||||
TSKEY lastRowTs = TSKEY_MAX;
|
||||
|
||||
CacheNextRowIter iter = {0};
|
||||
nextRowIterOpen(&iter, uid, pTsdb);
|
||||
|
||||
do {
|
||||
TSDBROW *pRow = NULL;
|
||||
nextRowIterGet(&iter, &pRow);
|
||||
|
||||
if (!pRow) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (lastRowTs == TSKEY_MAX) {
|
||||
lastRowTs = TSDBROW_TS(pRow);
|
||||
STColumn *pTColumn = &pTSchema->columns[0];
|
||||
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = lastRowTs});
|
||||
if (taosArrayPush(pColArray, pColVal) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
for (iCol = 1; iCol < nCol; ++iCol) {
|
||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||
|
||||
if (taosArrayPush(pColArray, pColVal) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (pColVal->isNone && !setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
}
|
||||
if (!setNoneCol) {
|
||||
// goto build the result ts row
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if ((TSDBROW_TS(pRow) < lastRowTs)) {
|
||||
// goto build the result ts row
|
||||
break;
|
||||
}
|
||||
|
||||
// merge into pColArray
|
||||
setNoneCol = false;
|
||||
for (iCol = noneCol; iCol < nCol; ++iCol) {
|
||||
// high version's column value
|
||||
SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, iCol);
|
||||
|
||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||
if (tColVal->isNone && !pColVal->isNone) {
|
||||
taosArraySet(pColArray, iCol, pColVal);
|
||||
} else if (tColVal->isNone && pColVal->isNone && !setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
}
|
||||
} while (setNoneCol);
|
||||
|
||||
// build the result ts row here
|
||||
*dup = false;
|
||||
if (taosArrayGetSize(pColArray) == nCol) {
|
||||
code = tdSTSRowNew(pColArray, pTSchema, ppRow);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
*ppRow = NULL;
|
||||
}
|
||||
|
||||
nextRowIterClose(&iter);
|
||||
taosArrayDestroy(pColArray);
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
nextRowIterClose(&iter);
|
||||
taosArrayDestroy(pColArray);
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mergeLast2(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
|
||||
int32_t code = 0;
|
||||
|
||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
|
||||
int16_t nCol = pTSchema->numOfCols;
|
||||
int16_t iCol = 0;
|
||||
int16_t noneCol = 0;
|
||||
bool setNoneCol = false;
|
||||
SArray *pColArray = taosArrayInit(nCol, sizeof(SLastCol));
|
||||
SColVal *pColVal = &(SColVal){0};
|
||||
|
||||
// tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
|
||||
|
||||
TSKEY lastRowTs = TSKEY_MAX;
|
||||
|
||||
CacheNextRowIter iter = {0};
|
||||
nextRowIterOpen(&iter, uid, pTsdb);
|
||||
|
||||
do {
|
||||
TSDBROW *pRow = NULL;
|
||||
nextRowIterGet(&iter, &pRow);
|
||||
|
||||
if (!pRow) {
|
||||
break;
|
||||
}
|
||||
|
||||
TSKEY rowTs = TSDBROW_TS(pRow);
|
||||
|
||||
if (lastRowTs == TSKEY_MAX) {
|
||||
lastRowTs = rowTs;
|
||||
STColumn *pTColumn = &pTSchema->columns[0];
|
||||
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = lastRowTs});
|
||||
if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
for (iCol = 1; iCol < nCol; ++iCol) {
|
||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||
|
||||
if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if ((pColVal->isNone || pColVal->isNull) && !setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
}
|
||||
if (!setNoneCol) {
|
||||
// goto build the result ts row
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
/*
|
||||
if ((TSDBROW_TS(pRow) < lastRowTs)) {
|
||||
// goto build the result ts row
|
||||
break;
|
||||
}
|
||||
*/
|
||||
// merge into pColArray
|
||||
setNoneCol = false;
|
||||
for (iCol = noneCol; iCol < nCol; ++iCol) {
|
||||
// high version's column value
|
||||
SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, iCol);
|
||||
|
||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||
if ((tColVal->isNone || tColVal->isNull) && (!pColVal->isNone && !pColVal->isNull)) {
|
||||
taosArraySet(pColArray, iCol, &(SLastCol){.ts = rowTs, .colVal = *pColVal});
|
||||
//} else if (tColVal->isNone && pColVal->isNone && !setNoneCol) {
|
||||
} else if ((tColVal->isNone || tColVal->isNull) && (pColVal->isNone || pColVal->isNull) && !setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
}
|
||||
} while (setNoneCol);
|
||||
|
||||
// build the result ts row here
|
||||
//*dup = false;
|
||||
if (taosArrayGetSize(pColArray) <= 0) {
|
||||
*ppLastArray = NULL;
|
||||
taosArrayDestroy(pColArray);
|
||||
} else {
|
||||
*ppLastArray = pColArray;
|
||||
}
|
||||
/* if (taosArrayGetSize(pColArray) == nCol) {
|
||||
code = tdSTSRowNew(pColArray, pTSchema, ppRow);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
*ppRow = NULL;
|
||||
}*/
|
||||
|
||||
nextRowIterClose(&iter);
|
||||
// taosArrayDestroy(pColArray);
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
nextRowIterClose(&iter);
|
||||
// taosArrayDestroy(pColArray);
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) {
|
||||
int32_t code = 0;
|
||||
SArray *pSkyline = NULL;
|
||||
|
@ -682,7 +1144,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
|||
if (code) goto _err;
|
||||
}
|
||||
|
||||
int iSkyline = taosArrayGetSize(pSkyline) - 1;
|
||||
int64_t iSkyline = taosArrayGetSize(pSkyline) - 1;
|
||||
|
||||
SBlockIdx idx = {.suid = suid, .uid = uid};
|
||||
|
||||
|
@ -719,12 +1181,14 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
|||
do {
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
if (input[i].next && !input[i].stop) {
|
||||
code = input[i].nextRowFn(input[i].iter, &input[i].pRow);
|
||||
if (code) goto _err;
|
||||
|
||||
if (input[i].pRow == NULL) {
|
||||
input[i].stop = true;
|
||||
input[i].next = false;
|
||||
code = input[i].nextRowFn(input[i].iter, &input[i].pRow);
|
||||
if (code) goto _err;
|
||||
|
||||
if (input[i].pRow == NULL) {
|
||||
input[i].stop = true;
|
||||
input[i].next = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -758,14 +1222,14 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
|||
|
||||
// delete detection
|
||||
TSDBROW *merge[3] = {0};
|
||||
// int iMerge[3] = {-1, -1, -1};
|
||||
int nMerge = 0;
|
||||
int iMerge[3] = {-1, -1, -1};
|
||||
int nMerge = 0;
|
||||
for (int i = 0; i < nMax; ++i) {
|
||||
TSDBKEY maxKey = TSDBROW_KEY(max[i]);
|
||||
|
||||
bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
|
||||
if (!deleted) {
|
||||
// iMerge[nMerge] = i;
|
||||
iMerge[nMerge] = i;
|
||||
merge[nMerge++] = max[i];
|
||||
}
|
||||
|
||||
|
@ -792,7 +1256,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
|||
}
|
||||
}
|
||||
|
||||
} while (*ppRow == NULL);
|
||||
} while (1);
|
||||
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
if (input[i].nextRowClearFn) {
|
||||
|
@ -819,11 +1283,6 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
TSKEY ts;
|
||||
SColVal colVal;
|
||||
} SLastCol;
|
||||
|
||||
// static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
||||
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
|
||||
int32_t code = 0;
|
||||
|
@ -873,7 +1332,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
|
|||
if (code) goto _err;
|
||||
}
|
||||
|
||||
int iSkyline = taosArrayGetSize(pSkyline) - 1;
|
||||
int64_t iSkyline = taosArrayGetSize(pSkyline) - 1;
|
||||
|
||||
SBlockIdx idx = {.suid = suid, .uid = uid};
|
||||
|
||||
|
@ -1128,7 +1587,7 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
|
|||
} else {
|
||||
STSRow *pRow = NULL;
|
||||
bool dup = false; // which is always false for now
|
||||
code = mergeLastRow(uid, pTsdb, &dup, &pRow);
|
||||
code = mergeLastRow2(uid, pTsdb, &dup, &pRow);
|
||||
// if table's empty or error, return code of -1
|
||||
if (code < 0 || pRow == NULL) {
|
||||
if (!dup && pRow) {
|
||||
|
@ -1195,7 +1654,8 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
|
|||
// STSRow *pRow = NULL;
|
||||
// code = mergeLast(uid, pTsdb, &pRow);
|
||||
SArray *pLastArray = NULL;
|
||||
code = mergeLast(uid, pTsdb, &pLastArray);
|
||||
// code = mergeLast(uid, pTsdb, &pLastArray);
|
||||
code = mergeLast2(uid, pTsdb, &pLastArray);
|
||||
// if table's empty or error, return code of -1
|
||||
// if (code < 0 || pRow == NULL) {
|
||||
if (code < 0 || pLastArray == NULL) {
|
||||
|
|
|
@ -561,7 +561,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
|
|||
}
|
||||
}
|
||||
|
||||
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow);
|
||||
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow, pMemTable->pTsdb);
|
||||
|
||||
pTbData->minVersion = TMIN(pTbData->minVersion, version);
|
||||
pTbData->maxVersion = TMAX(pTbData->maxVersion, version);
|
||||
|
|
Loading…
Reference in New Issue