tsdb/cache: merge lru with mem

This commit is contained in:
Minglei Jin 2024-12-05 17:13:59 +08:00
parent 65e6c04a2d
commit 2b062ed146
1 changed files with 315 additions and 92 deletions

View File

@ -644,30 +644,30 @@ int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) {
tsdbRowGetKey(pMemRow, &tsdbRowKey);
STSDBRowIter iter = {0};
TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, pMemRow, pTSchema), &lino, _exit);
TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
int32_t iCol = 0;
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
if (!taosArrayPush(ctxArray, &updateCtx)) {
TAOS_CHECK_GOTO(terrno, &lino, _exit);
TAOS_CHECK_EXIT(terrno);
}
if (COL_VAL_IS_VALUE(pColVal)) {
updateCtx.lflag = LFLAG_LAST;
if (!taosArrayPush(ctxArray, &updateCtx)) {
TAOS_CHECK_GOTO(terrno, &lino, _exit);
TAOS_CHECK_EXIT(terrno);
}
} else {
if (!iColHash) {
iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
if (iColHash == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
TAOS_CHECK_EXIT(terrno);
}
}
if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &pColVal->cid, sizeof(pColVal->cid))) {
TAOS_CHECK_EXIT(terrno);
}
}
}
@ -680,28 +680,29 @@ int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) {
break;
}
sver = TSDBROW_SVERSION(pMemRow);
TAOS_CHECK_EXIT(tsdbUpdateSkm(pTsdb, suid, uid, sver));
pTSchema = pTsdb->rCache.pTSchema;
STsdbRowKey tsdbRowKey = {0};
tsdbRowGetKey(pMemRow, &tsdbRowKey);
void *pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
int32_t iCol = ((int32_t *)pIte)[0];
SColVal colVal = COL_VAL_NONE(0, 0);
tsdbRowGetColVal(pMemRow, pTSchema, iCol, &colVal);
if (COL_VAL_IS_VALUE(&colVal)) {
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
int32_t iCol = 0;
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
if (tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)) && COL_VAL_IS_VALUE(pColVal)) {
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
if (!taosArrayPush(ctxArray, &updateCtx)) {
TAOS_CHECK_GOTO(terrno, &lino, _exit);
TAOS_CHECK_EXIT(terrno);
}
code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
code = tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid));
if (code != TSDB_CODE_SUCCESS) {
tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
__LINE__, tstrerror(code));
}
}
}
tsdbRowClose(&iter);
pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
}
@ -741,7 +742,7 @@ static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
if (nRow == 0 || nTbData == 0) return 0;
TAOS_CHECK_GOTO(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem), &lino, _exit);
TAOS_CHECK_EXIT(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem));
_exit:
if (code) {
@ -2009,8 +2010,8 @@ _exit:
}
static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
int8_t ltype) {
int32_t code = 0;
int8_t ltype, SArray *keyArray) {
int32_t code = 0, lino = 0;
SArray *remainCols = NULL;
SArray *ignoreFromRocks = NULL;
SLRUCache *pCache = pTsdb->lruCache;
@ -2031,6 +2032,10 @@ static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLas
key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3;
}
if (!taosArrayPush(keyArray, &key)) {
TAOS_CHECK_EXIT(terrno);
}
LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN);
SLastCol *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
@ -2224,8 +2229,8 @@ typedef struct {
STsdb *pTsdb;
} MemNextRowIter;
static int32_t MemRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) {
static int32_t memRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) {
int32_t code = 0, lino = 0;
STbData *pMem = NULL;
@ -2274,7 +2279,7 @@ _exit:
}
static void memRowIterClose(MemNextRowIter *pIter) {
for (int i = 0; i < 3; ++i) {
for (int i = 0; i < 2; ++i) {
if (pIter->input[i].nextRowClearFn) {
(void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
}
@ -2289,35 +2294,299 @@ static void memRowIterClose(MemNextRowIter *pIter) {
}
}
static int32_t MemRowIterGet(MemNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
int16_t *aCols, int nCols) {
static void freeTableInfoFunc(void *param) {
void **p = (void **)param;
taosMemoryFreeClear(*p);
}
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
if (!pReader->pTableMap) {
pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (!pReader->pTableMap) {
return NULL;
}
tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
}
STableLoadInfo *pInfo = NULL;
STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
if (!ppInfo) {
pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
if (pInfo) {
if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) {
return NULL;
}
}
return pInfo;
}
return *ppInfo;
}
static TSDBROW *memRowIterGet(MemNextRowIter *pIter, bool isLast, int16_t *aCols, int nCols) {
int32_t code = 0, lino = 0;
for (;;) {
for (int i = 0; i < 2; ++i) {
if (pIter->input[i].next && !pIter->input[i].stop) {
TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
&pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
&lino, _exit);
if (pIter->input[i].pRow == NULL) {
pIter->input[i].stop = true;
pIter->input[i].next = false;
}
}
}
if (pIter->input[0].stop && pIter->input[1].stop) {
return NULL;
}
TSDBROW *max[2] = {0};
int iMax[2] = {-1, -1};
int nMax = 0;
SRowKey maxKey = {.ts = TSKEY_MIN};
for (int i = 0; i < 2; ++i) {
if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
STsdbRowKey tsdbRowKey = {0};
tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
// merging & deduplicating on client side
int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
if (c <= 0) {
if (c < 0) {
nMax = 0;
maxKey = tsdbRowKey.key;
}
iMax[nMax] = i;
max[nMax++] = pIter->input[i].pRow;
}
pIter->input[i].next = false;
}
}
TSDBROW *merge[2] = {0};
int iMerge[2] = {-1, -1};
int nMerge = 0;
for (int i = 0; i < nMax; ++i) {
TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
if (!pIter->pSkyline) {
pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _exit, terrno);
uint64_t uid = pIter->idx.uid;
STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
TSDB_CHECK_NULL(pInfo, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
if (pInfo->pTombData == NULL) {
pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _exit, terrno);
}
if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
TAOS_CHECK_GOTO(terrno, &lino, _exit);
}
size_t delSize = TARRAY_SIZE(pInfo->pTombData);
if (delSize > 0) {
code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
TAOS_CHECK_GOTO(code, &lino, _exit);
}
pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
}
bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
if (!deleted) {
iMerge[nMerge] = iMax[i];
merge[nMerge++] = max[i];
}
pIter->input[iMax[i]].next = deleted;
}
if (nMerge > 0) {
pIter->input[iMerge[0]].next = true;
return merge[0];
}
}
_exit:
if (code) {
tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
}
TAOS_RETURN(code);
return NULL;
}
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
*ppDst = taosMemoryMalloc(len);
if (NULL == *ppDst) {
TAOS_RETURN(terrno);
}
memcpy(*ppDst, pSrc, len);
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema));
}
if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
taosMemoryFreeClear(pReader->pCurrSchema);
TAOS_RETURN(
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema));
}
static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
int8_t ltype) {
SArray *keyArray) {
int32_t code = 0;
int32_t lino = 0;
STSchema *pTSchema = pr->pSchema;
SLRUCache *pCache = pTsdb->lruCache;
SArray *pCidList = pr->pCidList;
int numKeys = TARRAY_SIZE(pCidList);
MemNextRowIter iter = {0};
SSHashObj *iColHash = NULL;
// 1, get from mem, imem filtered with delete info
TAOS_CHECK_EXIT(memRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pReadSnap, pr));
// 2, merge with lru entries from pLastArray
TSDBROW *pRow = memRowIterGet(&iter, false, NULL, 0);
if (!pRow) {
goto _exit;
}
int32_t sversion = TSDBROW_SVERSION(pRow);
if (sversion != -1) {
TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
pTSchema = pr->pCurrSchema;
}
int32_t nCol = pTSchema->numOfCols;
STsdbRowKey rowKey = {0};
tsdbRowGetKey(pRow, &rowKey);
STSDBRowIter rowIter = {0};
TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
int32_t iCol = 0, jCol = 0, jnCol = TARRAY_SIZE(pLastArray);
for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol && jCol < jnCol;
pColVal = tsdbRowIterNext(&rowIter), ++iCol) {
SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol];
if (pColVal->cid < pTargetCol->colVal.cid) {
continue;
}
int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key);
if (!IS_LAST_KEY(((SLastKey *)TARRAY_DATA(keyArray))[jCol])) {
if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _exit);
tsdbCacheFreeSLastColItem(pTargetCol);
taosArraySet(pLastArray, jCol, &lastCol);
}
} else {
if (COL_VAL_IS_VALUE(pColVal)) {
if (cmp_res <= 0) {
SLastCol lastCol = {
.rowKey = rowKey.key, .colVal = pTargetCol->colVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _exit);
tsdbCacheFreeSLastColItem(pTargetCol);
taosArraySet(pLastArray, jCol, &lastCol);
}
} else {
if (!iColHash) {
iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
if (iColHash == NULL) {
TAOS_CHECK_EXIT(terrno);
}
}
if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &jCol, sizeof(jCol))) {
TAOS_CHECK_EXIT(terrno);
}
}
}
++jCol;
}
tsdbRowClose(&rowIter);
pRow = memRowIterGet(&iter, false, NULL, 0);
while (pRow) {
if (tSimpleHashGetSize(iColHash) == 0) {
break;
}
sversion = TSDBROW_SVERSION(pRow);
if (sversion != -1) {
TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _exit);
pTSchema = pr->pCurrSchema;
}
nCol = pTSchema->numOfCols;
STsdbRowKey tsdbRowKey = {0};
tsdbRowGetKey(pRow, &tsdbRowKey);
STSDBRowIter rowIter = {0};
TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
iCol = 0;
for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol;
pColVal = tsdbRowIterNext(&rowIter), iCol++) {
int32_t *pjCol = tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid));
if (pjCol && COL_VAL_IS_VALUE(pColVal)) {
SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol];
int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key);
if (cmp_res <= 0) {
SLastCol lastCol = {
.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _exit);
tsdbCacheFreeSLastColItem(pTargetCol);
taosArraySet(pLastArray, *pjCol, &lastCol);
}
code = tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid));
if (code != TSDB_CODE_SUCCESS) {
tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
__LINE__, tstrerror(code));
}
}
}
tsdbRowClose(&rowIter);
pRow = memRowIterGet(&iter, false, NULL, 0);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
tsdbRowClose(&rowIter);
}
memRowIterClose(&iter);
TAOS_RETURN(code);
}
@ -2325,10 +2594,16 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
int32_t code = 0;
int32_t lino = 0;
TSDB_CHECK_CODE(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype), lino, _exit);
SArray *keyArray = taosArrayInit(16, sizeof(SLastKey));
if (!keyArray) {
TAOS_CHECK_EXIT(terrno);
}
// SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid};
TAOS_CHECK_EXIT(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype, keyArray));
if (tsUpdateCacheBatch) {
TSDB_CHECK_CODE(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, ltype), lino, _exit);
TAOS_CHECK_EXIT(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, keyArray));
}
_exit:
@ -2336,6 +2611,10 @@ _exit:
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
}
if (keyArray) {
taosArrayDestroy(keyArray);
}
TAOS_RETURN(code);
}
@ -2584,37 +2863,6 @@ static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
TAOS_RETURN(code);
}
static void freeTableInfoFunc(void *param) {
void **p = (void **)param;
taosMemoryFreeClear(*p);
}
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
if (!pReader->pTableMap) {
pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (!pReader->pTableMap) {
return NULL;
}
tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
}
STableLoadInfo *pInfo = NULL;
STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
if (!ppInfo) {
pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
if (pInfo) {
if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) {
return NULL;
}
}
return pInfo;
}
return *ppInfo;
}
static uint64_t *getUidList(SCacheRowsReader *pReader) {
if (!pReader->uidList) {
int32_t numOfTables = pReader->numOfTables;
@ -3503,31 +3751,6 @@ static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray,
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
*ppDst = taosMemoryMalloc(len);
if (NULL == *ppDst) {
TAOS_RETURN(terrno);
}
memcpy(*ppDst, pSrc, len);
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema));
}
if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
taosMemoryFreeClear(pReader->pCurrSchema);
TAOS_RETURN(
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema));
}
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
int nCols, int16_t *slotIds) {
int32_t code = 0, lino = 0;