tsdbCache/last: fix last loading crash issues
This commit is contained in:
parent
e3bb10218a
commit
ae2514efca
|
@ -253,7 +253,6 @@ int32_t tsdbOpenCache(STsdb *pTsdb);
|
||||||
void tsdbCloseCache(SLRUCache *pCache);
|
void tsdbCloseCache(SLRUCache *pCache);
|
||||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
|
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
|
||||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup);
|
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup);
|
||||||
|
|
||||||
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
|
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
|
||||||
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
|
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
|
||||||
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
|
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
|
||||||
|
|
|
@ -162,30 +162,13 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
|
||||||
char key[32] = {0};
|
char key[32] = {0};
|
||||||
int keyLen = 0;
|
int keyLen = 0;
|
||||||
|
|
||||||
|
((void)(row));
|
||||||
|
|
||||||
getTableCacheKey(uid, "l", key, &keyLen);
|
getTableCacheKey(uid, "l", key, &keyLen);
|
||||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||||
if (h) {
|
if (h) {
|
||||||
cacheRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
// clear last cache anyway, lazy load when get last lookup
|
||||||
if (row->ts >= cacheRow->ts) {
|
|
||||||
if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) {
|
|
||||||
tdRowCpy(cacheRow, row);
|
|
||||||
|
|
||||||
taosLRUCacheRelease(pCache, h, false);
|
|
||||||
} else {
|
|
||||||
taosLRUCacheRelease(pCache, h, true);
|
taosLRUCacheRelease(pCache, h, true);
|
||||||
/* tsdbCacheDeleteLast(pCache, uid, TSKEY_MAX); */
|
|
||||||
tsdbCacheInsertLast(pCache, uid, row);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
cacheRow = tdRowDup(row);
|
|
||||||
|
|
||||||
_taos_lru_deleter_t deleter = deleteTableCacheLastrow;
|
|
||||||
LRUStatus status =
|
|
||||||
taosLRUCacheInsert(pCache, key, keyLen, cacheRow, TD_ROW_LEN(cacheRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW);
|
|
||||||
if (status != TAOS_LRU_STATUS_OK) {
|
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -283,8 +266,10 @@ static int32_t getTableDelSkyline(STbData *pMem, STbData *pIMem, SDelFReader *pD
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
size_t nDelData = taosArrayGetSize(aDelData);
|
size_t nDelData = taosArrayGetSize(aDelData);
|
||||||
|
if (nDelData > 0) {
|
||||||
code = tsdbBuildDeleteSkyline(aDelData, 0, (int32_t)(nDelData - 1), aSkyline);
|
code = tsdbBuildDeleteSkyline(aDelData, 0, (int32_t)(nDelData - 1), aSkyline);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayDestroy(aDelData);
|
taosArrayDestroy(aDelData);
|
||||||
|
|
||||||
|
@ -390,6 +375,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
||||||
|
|
||||||
state->nBlock = state->blockMap.nItem;
|
state->nBlock = state->blockMap.nItem;
|
||||||
state->iBlock = state->nBlock - 1;
|
state->iBlock = state->nBlock - 1;
|
||||||
|
|
||||||
|
tBlockDataInit(&state->blockData);
|
||||||
}
|
}
|
||||||
case SFSNEXTROW_BLOCKDATA:
|
case SFSNEXTROW_BLOCKDATA:
|
||||||
if (state->iBlock >= 0) {
|
if (state->iBlock >= 0) {
|
||||||
|
@ -420,6 +407,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
||||||
if (state->aBlockIdx) {
|
if (state->aBlockIdx) {
|
||||||
taosArrayDestroy(state->aBlockIdx);
|
taosArrayDestroy(state->aBlockIdx);
|
||||||
}
|
}
|
||||||
|
tBlockDataClear(&state->blockData);
|
||||||
|
|
||||||
state->state = SFSNEXTROW_FILESET;
|
state->state = SFSNEXTROW_FILESET;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -438,6 +427,7 @@ _err:
|
||||||
if (state->aBlockIdx) {
|
if (state->aBlockIdx) {
|
||||||
taosArrayDestroy(state->aBlockIdx);
|
taosArrayDestroy(state->aBlockIdx);
|
||||||
}
|
}
|
||||||
|
tBlockDataClear(&state->blockData);
|
||||||
|
|
||||||
*ppRow = NULL;
|
*ppRow = NULL;
|
||||||
|
|
||||||
|
@ -511,6 +501,15 @@ static int32_t tsRowFromTsdbRow(STSchema *pTSchema, TSDBROW *pRow, STSRow **ppRo
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TSDBKEY key = TSDBROW_KEY(pRow);
|
||||||
|
STColumn *pTColumn = &pTSchema->columns[0];
|
||||||
|
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts});
|
||||||
|
|
||||||
|
if (taosArrayPush(pArray, pColVal) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
|
for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
|
||||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||||
if (taosArrayPush(pArray, pColVal) == NULL) {
|
if (taosArrayPush(pArray, pColVal) == NULL) {
|
||||||
|
@ -683,7 +682,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
||||||
|
|
||||||
// delete detection
|
// delete detection
|
||||||
TSDBROW *merge[3] = {0};
|
TSDBROW *merge[3] = {0};
|
||||||
int iMerge[3] = {-1, -1, -1};
|
// int iMerge[3] = {-1, -1, -1};
|
||||||
int nMerge = 0;
|
int nMerge = 0;
|
||||||
for (int i = 0; i < nMax; ++i) {
|
for (int i = 0; i < nMax; ++i) {
|
||||||
TSDBKEY maxKey = TSDBROW_KEY(max[i]);
|
TSDBKEY maxKey = TSDBROW_KEY(max[i]);
|
||||||
|
@ -691,7 +690,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
||||||
// bool deleted = false;
|
// bool deleted = false;
|
||||||
bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
|
bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
|
||||||
if (!deleted) {
|
if (!deleted) {
|
||||||
iMerge[nMerge] = i;
|
// iMerge[nMerge] = i;
|
||||||
merge[nMerge++] = max[i];
|
merge[nMerge++] = max[i];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -857,7 +856,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
||||||
// bool deleted = false;
|
// bool deleted = false;
|
||||||
bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
|
bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
|
||||||
if (!deleted) {
|
if (!deleted) {
|
||||||
iMerge[nMerge] = i;
|
iMerge[nMerge] = iMax[i];
|
||||||
merge[nMerge++] = max[i];
|
merge[nMerge++] = max[i];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -896,8 +895,9 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
||||||
++iCol;
|
++iCol;
|
||||||
|
|
||||||
setICol = false;
|
setICol = false;
|
||||||
for (int16_t i = iCol; iCol < nCol; ++i) {
|
for (int16_t i = iCol; i < nCol; ++i) {
|
||||||
// tsdbRowGetColVal(*ppRow, pTSchema, i, pColVal);
|
// tsdbRowGetColVal(*ppRow, pTSchema, i, pColVal);
|
||||||
|
tTSRowGetVal(*ppRow, pTSchema, i, pColVal);
|
||||||
if (taosArrayPush(pColArray, pColVal) == NULL) {
|
if (taosArrayPush(pColArray, pColVal) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -936,7 +936,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
||||||
--nilColCount;
|
--nilColCount;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (tColVal->isNull || tColVal->isNone && !setICol) {
|
if ((tColVal->isNull || tColVal->isNone) && !setICol) {
|
||||||
iCol = i;
|
iCol = i;
|
||||||
setICol = true;
|
setICol = true;
|
||||||
|
|
||||||
|
@ -1020,7 +1020,13 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbCacheInsertLast(pCache, uid, pRow);
|
_taos_lru_deleter_t deleter = deleteTableCacheLastrow;
|
||||||
|
LRUStatus status =
|
||||||
|
taosLRUCacheInsert(pCache, key, keyLen, pRow, TD_ROW_LEN(pRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW);
|
||||||
|
if (status != TAOS_LRU_STATUS_OK) {
|
||||||
|
code = -1;
|
||||||
|
}
|
||||||
|
/* tsdbCacheInsertLast(pCache, uid, pRow); */
|
||||||
h = taosLRUCacheLookup(pCache, key, keyLen);
|
h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||||
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||||
}
|
}
|
||||||
|
|
|
@ -558,6 +558,8 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow);
|
||||||
|
|
||||||
pTbData->minVersion = TMIN(pTbData->minVersion, version);
|
pTbData->minVersion = TMIN(pTbData->minVersion, version);
|
||||||
pTbData->maxVersion = TMAX(pTbData->maxVersion, version);
|
pTbData->maxVersion = TMAX(pTbData->maxVersion, version);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue