From 394da5d8bf9de323358c7a642d28839346c13486 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Mon, 15 Jul 2024 11:31:39 +0800 Subject: [PATCH] fix: (pk) load last cache from tsdb --- source/dnode/vnode/src/tsdb/tsdbCache.c | 217 +++--------------------- 1 file changed, 19 insertions(+), 198 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 4597a44845..e3c5098ccd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1307,186 +1307,6 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds); -#ifdef BUILD_NO_CALL -static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) { - SLastCol *pLastCol = NULL; - - char *err = NULL; - size_t vlen = 0; - SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; - size_t klen = ROCKS_KEY_LEN; - char *value = NULL; - value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); - rocksdb_free(err); - } - - pLastCol = tsdbCacheDeserialize(value); - - return pLastCol; -} - -int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { - rocksdb_writebatch_t *wb = NULL; - int32_t code = 0; - - SArray *pCidList = pr->pCidList; - int num_keys = TARRAY_SIZE(pCidList); - - char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *)); - size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t)); - char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN); - for (int i = 0; i < num_keys; ++i) { - int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); - - memcpy(key_list + i * ROCKS_KEY_LEN, &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}, ROCKS_KEY_LEN); - keys_list[i] = key_list + i * ROCKS_KEY_LEN; - keys_list_sizes[i] = ROCKS_KEY_LEN; - } - - char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); - size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); - char **errs = taosMemoryMalloc(num_keys * sizeof(char *)); - rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, - keys_list_sizes, values_list, values_list_sizes, errs); - for (int i = 0; i < num_keys; ++i) { - if (errs[i]) { - rocksdb_free(errs[i]); - } - } - taosMemoryFree(key_list); - taosMemoryFree(keys_list); - taosMemoryFree(keys_list_sizes); - taosMemoryFree(errs); - - for (int i = 0; i < num_keys; ++i) { - bool freeCol = true; - SArray *pTmpColArray = NULL; - SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); - int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); - SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; - if (pLastCol) { - reallocVarData(&pLastCol->colVal); - } else { - taosThreadMutexLock(&pTsdb->rCache.rMutex); - - pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype); - if (!pLastCol) { - // recalc: load from tsdb - int16_t aCols[1] = {cid}; - int16_t slotIds[1] = {pr->pSlotIds[i]}; - pTmpColArray = NULL; - - if (ltype) { - mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); - } else { - mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); - } - - if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) { - pLastCol = taosArrayGet(pTmpColArray, 0); - freeCol = false; - } - - // still null, then make up a none col value - if (!pLastCol) { - pLastCol = &noneCol; - freeCol = false; - } - - // store result back to rocks cache - wb = pTsdb->rCache.rwritebatch; - char *value = NULL; - size_t vlen = 0; - tsdbCacheSerialize(pLastCol, &value, &vlen); - - SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = pLastCol->colVal.cid}; - size_t klen = ROCKS_KEY_LEN; - rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); - - taosMemoryFree(value); - } else { - reallocVarData(&pLastCol->colVal); - } - - if (wb) { - rocksMayWrite(pTsdb, false, true, false); - } - - taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - } - - taosArrayPush(pLastArray, pLastCol); - taosArrayDestroy(pTmpColArray); - if (freeCol) { - taosMemoryFree(pLastCol); - } - } - taosMemoryFree(values_list); - taosMemoryFree(values_list_sizes); - - return code; -} - -static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t slotid, tb_uid_t uid, int16_t cid, - int8_t ltype) { - SLastCol *pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype); - if (!pLastCol) { - rocksdb_writebatch_t *wb = NULL; - - taosThreadMutexLock(&pTsdb->rCache.rMutex); - pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype); - if (!pLastCol) { - // recalc: load from tsdb - int16_t aCols[1] = {cid}; - int16_t slotIds[1] = {slotid}; - SArray *pTmpColArray = NULL; - - if (ltype) { - mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); - } else { - mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); - } - - if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) { - pLastCol = taosArrayGet(pTmpColArray, 0); - } - - // still null, then make up a none col value - SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[slotid].type)}; - if (!pLastCol) { - pLastCol = &noneCol; - } - - // store result back to rocks cache - wb = pTsdb->rCache.rwritebatch; - char *value = NULL; - size_t vlen = 0; - tsdbCacheSerialize(pLastCol, &value, &vlen); - - SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = pLastCol->colVal.cid}; - size_t klen = ROCKS_KEY_LEN; - rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); - taosMemoryFree(value); - - SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); - *pTmpLastCol = *pLastCol; - pLastCol = pTmpLastCol; - - taosArrayDestroy(pTmpColArray); - } - - if (wb) { - rocksMayWrite(pTsdb, false, true, false); - } - - taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - } - - return pLastCol; -} -#endif static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols, SCacheRowsReader *pr, int8_t ltype) { @@ -3449,8 +3269,9 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC taosArrayPush(aColArray, &aCols[i]); } - TSKEY lastRowTs = TSKEY_MAX; + STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX}; + // inverse iterator CacheNextRowIter iter = {0}; nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr); @@ -3474,10 +3295,11 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC } // int16_t nCol = pTSchema->numOfCols; - TSKEY rowTs = TSDBROW_TS(pRow); + STsdbRowKey rowKey = {0}; + tsdbRowGetKey(pRow, &rowKey); - if (lastRowTs == TSKEY_MAX) { - lastRowTs = rowTs; + if (lastRowKey.key.ts == TSKEY_MAX) { // first time + lastRowKey = rowKey; for (int16_t iCol = noneCol; iCol < nCols; ++iCol) { if (iCol >= nLastCol) { @@ -3497,13 +3319,13 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC if (slotIds[iCol] == 0) { STColumn *pTColumn = &pTSchema->columns[0]; - *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs})); - taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}); + *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts})); + taosArraySet(pColArray, 0, &(SLastCol){.rowKey = rowKey.key, .colVal = *pColVal}); continue; } tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); - *pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}; + *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal}; if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) { if (pColVal->value.nData > 0) { pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); @@ -3550,10 +3372,11 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC continue; } SColVal *tColVal = &lastColVal->colVal; + if (COL_VAL_IS_VALUE(tColVal)) continue; tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); - if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) { - SLastCol lastCol = {.rowKey.ts = rowTs, .colVal = *pColVal}; + if (COL_VAL_IS_VALUE(pColVal)) { + SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal}; if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) { SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); taosMemoryFree(pLastCol->colVal.value.pData); @@ -3576,7 +3399,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC if (aColIndex >= 0) { taosArrayRemove(aColArray, aColIndex); } - } else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) { + } else if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) { noneCol = iCol; setNoneCol = true; } @@ -3633,8 +3456,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, taosArrayPush(aColArray, &aCols[i]); } - TSKEY lastRowTs = TSKEY_MAX; - + // inverse iterator CacheNextRowIter iter = {0}; nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr); @@ -3658,9 +3480,8 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, } // int16_t nCol = pTSchema->numOfCols; - TSKEY rowTs = TSDBROW_TS(pRow); - - lastRowTs = rowTs; + STsdbRowKey rowKey = {0}; + tsdbRowGetKey(pRow, &rowKey); for (int16_t iCol = noneCol; iCol < nCols; ++iCol) { if (iCol >= nLastCol) { @@ -3676,13 +3497,13 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, if (slotIds[iCol] == 0) { STColumn *pTColumn = &pTSchema->columns[0]; - *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs})); - taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}); + *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts})); + taosArraySet(pColArray, 0, &(SLastCol){.rowKey = rowKey.key, .colVal = *pColVal}); continue; } tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); - *pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}; + *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal}; if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) { if (pColVal->value.nData > 0) { pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);