From 2b062ed146b70170ff0257c463975200f8df8aef Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 5 Dec 2024 17:13:59 +0800 Subject: [PATCH] tsdb/cache: merge lru with mem --- source/dnode/vnode/src/tsdb/tsdbCache.c | 407 ++++++++++++++++++------ 1 file changed, 315 insertions(+), 92 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index ade466cf08..57c2783984 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -644,30 +644,30 @@ int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) { tsdbRowGetKey(pMemRow, &tsdbRowKey); STSDBRowIter iter = {0}; - TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, pMemRow, pTSchema), &lino, _exit); + TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema)); int32_t iCol = 0; for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; if (!taosArrayPush(ctxArray, &updateCtx)) { - TAOS_CHECK_GOTO(terrno, &lino, _exit); + TAOS_CHECK_EXIT(terrno); } if (COL_VAL_IS_VALUE(pColVal)) { updateCtx.lflag = LFLAG_LAST; if (!taosArrayPush(ctxArray, &updateCtx)) { - TAOS_CHECK_GOTO(terrno, &lino, _exit); + TAOS_CHECK_EXIT(terrno); } } else { if (!iColHash) { - iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT)); if (iColHash == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); + TAOS_CHECK_EXIT(terrno); } } - if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); + if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &pColVal->cid, sizeof(pColVal->cid))) { + TAOS_CHECK_EXIT(terrno); } } } @@ -680,28 +680,29 @@ int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) { break; } + sver = TSDBROW_SVERSION(pMemRow); + TAOS_CHECK_EXIT(tsdbUpdateSkm(pTsdb, suid, uid, sver)); + pTSchema = pTsdb->rCache.pTSchema; + STsdbRowKey tsdbRowKey = {0}; tsdbRowGetKey(pMemRow, &tsdbRowKey); - void *pIte = NULL; - int32_t iter = 0; - while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) { - int32_t iCol = ((int32_t *)pIte)[0]; - SColVal colVal = COL_VAL_NONE(0, 0); - tsdbRowGetColVal(pMemRow, pTSchema, iCol, &colVal); - - if (COL_VAL_IS_VALUE(&colVal)) { - SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal}; + int32_t iCol = 0; + for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { + if (tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)) && COL_VAL_IS_VALUE(pColVal)) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; if (!taosArrayPush(ctxArray, &updateCtx)) { - TAOS_CHECK_GOTO(terrno, &lino, _exit); + TAOS_CHECK_EXIT(terrno); } - code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter); + + code = tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)); if (code != TSDB_CODE_SUCCESS) { tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); } } } + tsdbRowClose(&iter); pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline); } @@ -741,7 +742,7 @@ static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) { if (nRow == 0 || nTbData == 0) return 0; - TAOS_CHECK_GOTO(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem), &lino, _exit); + TAOS_CHECK_EXIT(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem)); _exit: if (code) { @@ -2009,8 +2010,8 @@ _exit: } static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, - int8_t ltype) { - int32_t code = 0; + int8_t ltype, SArray *keyArray) { + int32_t code = 0, lino = 0; SArray *remainCols = NULL; SArray *ignoreFromRocks = NULL; SLRUCache *pCache = pTsdb->lruCache; @@ -2031,6 +2032,10 @@ static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLas key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3; } + if (!taosArrayPush(keyArray, &key)) { + TAOS_CHECK_EXIT(terrno); + } + LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN); SLastCol *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL; if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) { @@ -2224,8 +2229,8 @@ typedef struct { STsdb *pTsdb; } MemNextRowIter; -static int32_t MemRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, - SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) { +static int32_t memRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, + STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) { int32_t code = 0, lino = 0; STbData *pMem = NULL; @@ -2274,7 +2279,7 @@ _exit: } static void memRowIterClose(MemNextRowIter *pIter) { - for (int i = 0; i < 3; ++i) { + for (int i = 0; i < 2; ++i) { if (pIter->input[i].nextRowClearFn) { (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter); } @@ -2289,35 +2294,299 @@ static void memRowIterClose(MemNextRowIter *pIter) { } } -static int32_t MemRowIterGet(MemNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, - int16_t *aCols, int nCols) { +static void freeTableInfoFunc(void *param) { + void **p = (void **)param; + taosMemoryFreeClear(*p); +} + +static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) { + if (!pReader->pTableMap) { + pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + if (!pReader->pTableMap) { + return NULL; + } + + tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc); + } + + STableLoadInfo *pInfo = NULL; + STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid)); + if (!ppInfo) { + pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo)); + if (pInfo) { + if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) { + return NULL; + } + } + + return pInfo; + } + + return *ppInfo; +} + +static TSDBROW *memRowIterGet(MemNextRowIter *pIter, bool isLast, int16_t *aCols, int nCols) { int32_t code = 0, lino = 0; + for (;;) { + for (int i = 0; i < 2; ++i) { + if (pIter->input[i].next && !pIter->input[i].stop) { + TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, + &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols), + &lino, _exit); + + if (pIter->input[i].pRow == NULL) { + pIter->input[i].stop = true; + pIter->input[i].next = false; + } + } + } + + if (pIter->input[0].stop && pIter->input[1].stop) { + return NULL; + } + + TSDBROW *max[2] = {0}; + int iMax[2] = {-1, -1}; + int nMax = 0; + SRowKey maxKey = {.ts = TSKEY_MIN}; + + for (int i = 0; i < 2; ++i) { + if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) { + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey); + + // merging & deduplicating on client side + int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key); + if (c <= 0) { + if (c < 0) { + nMax = 0; + maxKey = tsdbRowKey.key; + } + + iMax[nMax] = i; + max[nMax++] = pIter->input[i].pRow; + } + pIter->input[i].next = false; + } + } + + TSDBROW *merge[2] = {0}; + int iMerge[2] = {-1, -1}; + int nMerge = 0; + for (int i = 0; i < nMax; ++i) { + TSDBKEY maxKey1 = TSDBROW_KEY(max[i]); + + if (!pIter->pSkyline) { + pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); + TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _exit, terrno); + + uint64_t uid = pIter->idx.uid; + STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid); + TSDB_CHECK_NULL(pInfo, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY); + + if (pInfo->pTombData == NULL) { + pInfo->pTombData = taosArrayInit(4, sizeof(SDelData)); + TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _exit, terrno); + } + + if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) { + TAOS_CHECK_GOTO(terrno, &lino, _exit); + } + + size_t delSize = TARRAY_SIZE(pInfo->pTombData); + if (delSize > 0) { + code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1; + } + + bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline); + if (!deleted) { + iMerge[nMerge] = iMax[i]; + merge[nMerge++] = max[i]; + } + + pIter->input[iMax[i]].next = deleted; + } + + if (nMerge > 0) { + pIter->input[iMerge[0]].next = true; + + return merge[0]; + } + } + _exit: if (code) { tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code)); } - TAOS_RETURN(code); + return NULL; +} + +static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) { + int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols; + *ppDst = taosMemoryMalloc(len); + if (NULL == *ppDst) { + TAOS_RETURN(terrno); + } + memcpy(*ppDst, pSrc, len); + + TAOS_RETURN(TSDB_CODE_SUCCESS); +} + +static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) { + if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) { + TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema)); + } + + if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) { + TAOS_RETURN(TSDB_CODE_SUCCESS); + } + + taosMemoryFreeClear(pReader->pCurrSchema); + TAOS_RETURN( + metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema)); } static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, - int8_t ltype) { - int32_t code = 0; - int32_t lino = 0; - SLRUCache *pCache = pTsdb->lruCache; - SArray *pCidList = pr->pCidList; - int numKeys = TARRAY_SIZE(pCidList); + SArray *keyArray) { + int32_t code = 0; + int32_t lino = 0; + STSchema *pTSchema = pr->pSchema; + SLRUCache *pCache = pTsdb->lruCache; + SArray *pCidList = pr->pCidList; + int numKeys = TARRAY_SIZE(pCidList); + MemNextRowIter iter = {0}; + SSHashObj *iColHash = NULL; // 1, get from mem, imem filtered with delete info + TAOS_CHECK_EXIT(memRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pReadSnap, pr)); - // 2, merge with lru entries from pLastArray + TSDBROW *pRow = memRowIterGet(&iter, false, NULL, 0); + if (!pRow) { + goto _exit; + } + + int32_t sversion = TSDBROW_SVERSION(pRow); + if (sversion != -1) { + TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid)); + + pTSchema = pr->pCurrSchema; + } + int32_t nCol = pTSchema->numOfCols; + + STsdbRowKey rowKey = {0}; + tsdbRowGetKey(pRow, &rowKey); + + STSDBRowIter rowIter = {0}; + TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema)); + + int32_t iCol = 0, jCol = 0, jnCol = TARRAY_SIZE(pLastArray); + for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol && jCol < jnCol; + pColVal = tsdbRowIterNext(&rowIter), ++iCol) { + SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol]; + if (pColVal->cid < pTargetCol->colVal.cid) { + continue; + } + + int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key); + if (!IS_LAST_KEY(((SLastKey *)TARRAY_DATA(keyArray))[jCol])) { + if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { + SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID}; + TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _exit); + + tsdbCacheFreeSLastColItem(pTargetCol); + taosArraySet(pLastArray, jCol, &lastCol); + } + } else { + if (COL_VAL_IS_VALUE(pColVal)) { + if (cmp_res <= 0) { + SLastCol lastCol = { + .rowKey = rowKey.key, .colVal = pTargetCol->colVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID}; + TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _exit); + + tsdbCacheFreeSLastColItem(pTargetCol); + taosArraySet(pLastArray, jCol, &lastCol); + } + } else { + if (!iColHash) { + iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT)); + if (iColHash == NULL) { + TAOS_CHECK_EXIT(terrno); + } + } + + if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &jCol, sizeof(jCol))) { + TAOS_CHECK_EXIT(terrno); + } + } + } + + ++jCol; + } + tsdbRowClose(&rowIter); + + pRow = memRowIterGet(&iter, false, NULL, 0); + while (pRow) { + if (tSimpleHashGetSize(iColHash) == 0) { + break; + } + + sversion = TSDBROW_SVERSION(pRow); + if (sversion != -1) { + TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _exit); + + pTSchema = pr->pCurrSchema; + } + nCol = pTSchema->numOfCols; + + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(pRow, &tsdbRowKey); + + STSDBRowIter rowIter = {0}; + TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema)); + + iCol = 0; + for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol; + pColVal = tsdbRowIterNext(&rowIter), iCol++) { + int32_t *pjCol = tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)); + if (pjCol && COL_VAL_IS_VALUE(pColVal)) { + SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol]; + int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key); + + if (cmp_res <= 0) { + SLastCol lastCol = { + .rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID}; + TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _exit); + + tsdbCacheFreeSLastColItem(pTargetCol); + taosArraySet(pLastArray, *pjCol, &lastCol); + } + + code = tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)); + if (code != TSDB_CODE_SUCCESS) { + tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, + __LINE__, tstrerror(code)); + } + } + } + tsdbRowClose(&rowIter); + + pRow = memRowIterGet(&iter, false, NULL, 0); + } _exit: if (code) { tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); + + tsdbRowClose(&rowIter); } + memRowIterClose(&iter); + TAOS_RETURN(code); } @@ -2325,10 +2594,16 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache int32_t code = 0; int32_t lino = 0; - TSDB_CHECK_CODE(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype), lino, _exit); + SArray *keyArray = taosArrayInit(16, sizeof(SLastKey)); + if (!keyArray) { + TAOS_CHECK_EXIT(terrno); + } + + // SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid}; + TAOS_CHECK_EXIT(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype, keyArray)); if (tsUpdateCacheBatch) { - TSDB_CHECK_CODE(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, ltype), lino, _exit); + TAOS_CHECK_EXIT(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, keyArray)); } _exit: @@ -2336,6 +2611,10 @@ _exit: tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); } + if (keyArray) { + taosArrayDestroy(keyArray); + } + TAOS_RETURN(code); } @@ -2584,37 +2863,6 @@ static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) { TAOS_RETURN(code); } -static void freeTableInfoFunc(void *param) { - void **p = (void **)param; - taosMemoryFreeClear(*p); -} - -static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) { - if (!pReader->pTableMap) { - pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); - if (!pReader->pTableMap) { - return NULL; - } - - tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc); - } - - STableLoadInfo *pInfo = NULL; - STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid)); - if (!ppInfo) { - pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo)); - if (pInfo) { - if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) { - return NULL; - } - } - - return pInfo; - } - - return *ppInfo; -} - static uint64_t *getUidList(SCacheRowsReader *pReader) { if (!pReader->uidList) { int32_t numOfTables = pReader->numOfTables; @@ -3503,31 +3751,6 @@ static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, TAOS_RETURN(TSDB_CODE_SUCCESS); } -static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) { - int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols; - *ppDst = taosMemoryMalloc(len); - if (NULL == *ppDst) { - TAOS_RETURN(terrno); - } - memcpy(*ppDst, pSrc, len); - - TAOS_RETURN(TSDB_CODE_SUCCESS); -} - -static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) { - if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) { - TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema)); - } - - if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) { - TAOS_RETURN(TSDB_CODE_SUCCESS); - } - - taosMemoryFreeClear(pReader->pCurrSchema); - TAOS_RETURN( - metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema)); -} - static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds) { int32_t code = 0, lino = 0;