From fc047ddafc76eb1f3a8d5c56ff2c5d0cc867e53d Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Tue, 14 May 2024 19:04:52 +0800 Subject: [PATCH] fix: last cache update rule --- include/common/tdataformat.h | 1 + source/common/src/tdataformat.c | 19 +- source/dnode/vnode/src/inc/tsdb.h | 10 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 462 ++++++++++++++++----- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 4 +- source/dnode/vnode/src/tsdb/tsdbUtil.c | 74 +++- 6 files changed, 448 insertions(+), 122 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 49ffd8ec35..7d35010e5c 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -133,6 +133,7 @@ int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc); int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter); void tRowIterClose(SRowIter **ppIter); SColVal *tRowIterNext(SRowIter *pIter); +SColVal *tRowIterMoveTo(SRowIter *pIter, int32_t iTColum); // STag ================================ int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 04ad00e1dc..eeff53d9e0 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -774,7 +774,7 @@ void tRowIterClose(SRowIter **ppIter) { *ppIter = NULL; } -SColVal *tRowIterNext(SRowIter *pIter) { +static SColVal *tRowIterGetValue(SRowIter *pIter) { if (pIter->iTColumn >= pIter->pTSchema->numOfCols) { return NULL; } @@ -896,10 +896,25 @@ SColVal *tRowIterNext(SRowIter *pIter) { } _exit: - pIter->iTColumn++; return &pIter->cv; } +SColVal* tRowIterNext(SRowIter *pIter) { + SColVal* pColVal = tRowIterGetValue(pIter); + if (pColVal) { + pIter->iTColumn++; + } + return pColVal; +} + +SColVal *tRowIterMoveTo(SRowIter *pIter, int32_t iTColum) { + pIter->iTColumn = iTColum; + if (pIter->iTColumn < 0 || pIter->iTColumn >= pIter->pTSchema->numOfCols) { + return NULL; + } + return tRowIterGetValue(pIter); +} + static int32_t tRowNoneUpsertColData(SColData *aColData, int32_t nColData, int32_t flag) { int32_t code = 0; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 01db196479..071848cc0c 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -132,6 +132,7 @@ void tColRowGetKey(SBlockData *pBlock, int32_t irow, SRowKey *key); int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); void tsdbRowClose(STSDBRowIter *pIter); SColVal *tsdbRowIterNext(STSDBRowIter *pIter); +SColVal *tsdbRowIterMoveTo(STSDBRowIter *pIter, int32_t iCol); // SRowMerger int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pSchema); @@ -902,9 +903,16 @@ typedef struct { SColVal colVal; } SLastCol; +typedef struct { + int8_t lflag; + STsdbRowKey tsdbRowKey; + SColVal colVal; +} SLastUpdateCtx; + int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); -int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row); +int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t nRow, SRow **aRow, TSDBROW *pLRow); +int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData, TSDBROW *pLRow); int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 6f26e3a63f..1034a21487 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -992,46 +992,30 @@ static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal } } -int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { +static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) { + if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) { + return 0; + } + int32_t code = 0; - // 1, fetch schema - STSchema *pTSchema = NULL; - int32_t sver = TSDBROW_SVERSION(pRow); - - code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return -1; - } - - // 2, iterate col values into array - SArray *aColVal = taosArrayInit(32, sizeof(SColVal)); - - STSDBRowIter iter = {0}; - tsdbRowIterOpen(&iter, pRow, pTSchema); - - for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { - taosArrayPush(aColVal, pColVal); - } - - tsdbRowClose(&iter); - - // 3, build keys & multi get from rocks - int num_keys = TARRAY_SIZE(aColVal); + int num_keys = TARRAY_SIZE(updCtxArray); SArray *remainCols = NULL; SLRUCache *pCache = pTsdb->lruCache; - STsdbRowKey tsdbRowKey = {0}; - tsdbRowGetKey(pRow, &tsdbRowKey); - SRowKey *pRowKey = &tsdbRowKey.key; - taosThreadMutexLock(&pTsdb->lruMutex); for (int i = 0; i < num_keys; ++i) { - SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); - int16_t cid = pColVal->cid; + SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, i); - SLastKey *key = &(SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid}; + int8_t lflag = updCtx->lflag; + SRowKey *pRowKey = &updCtx->tsdbRowKey.key; + SColVal *pColVal = &updCtx->colVal; + + if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) { + continue; + } + + SLastKey *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid}; size_t klen = ROCKS_KEY_LEN; LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); if (h) { @@ -1047,23 +1031,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow } taosArrayPush(remainCols, &(SIdxKey){i, *key}); } - - if (COL_VAL_IS_VALUE(pColVal)) { - key->lflag = LFLAG_LAST; - LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); - if (h) { - SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) { - tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); - } - taosLRUCacheRelease(pCache, h, false); - } else { - if (!remainCols) { - remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey)); - } - taosArrayPush(remainCols, &(SIdxKey){i, *key}); - } - } } if (remainCols) { @@ -1091,12 +1058,17 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; for (int i = 0; i < num_keys; ++i) { SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; - SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx; - // SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx); + SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, i); + SRowKey *pRowKey = &updCtx->tsdbRowKey.key; + SColVal *pColVal = &updCtx->colVal; SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]); SLastCol *PToFree = pLastCol; + if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) { + continue; + } + if (IS_LAST_ROW_KEY(idxKey->key)) { int32_t cmp_res = 1; if (pLastCol) { @@ -1142,48 +1114,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosMemoryFree(value); } - } else { - if (COL_VAL_IS_VALUE(pColVal)) { - if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) { - char *value = NULL; - size_t vlen = 0; - SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal}; - tsdbCacheSerialize(&lastColTmp, &value, &vlen); - - taosThreadMutexLock(&pTsdb->rCache.rMutex); - - rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); - - taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - - pLastCol = &lastColTmp; - SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); - *pTmpLastCol = *pLastCol; - pLastCol = pTmpLastCol; - - size_t charge = sizeof(*pLastCol); - for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { - SValue *pValue = &pLastCol->rowKey.pks[i]; - if (IS_VAR_DATA_TYPE(pValue->type)) { - reallocVarDataVal(pValue); - charge += pValue->nData; - } - } - - if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { - reallocVarData(&pLastCol->colVal); - charge += pLastCol->colVal.value.nData; - } - - LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, - tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); - if (status != TAOS_LRU_STATUS_OK) { - code = -1; - } - - taosMemoryFree(value); - } - } } taosMemoryFreeClear(PToFree); @@ -1203,11 +1133,353 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosThreadMutexUnlock(&pTsdb->lruMutex); _exit: - taosArrayDestroy(aColVal); - taosMemoryFree(pTSchema); return code; } +int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t nRow, SRow **aRow, TSDBROW *pLRow) { + int32_t code = 0; + + // 1. prepare last + STSchema *pTSchema = NULL; + int32_t version = TSDBROW_VERSION(pLRow); + int32_t sver = TSDBROW_SVERSION(pLRow); + SArray *ctxArray = NULL; + SSHashObj *iColHash = NULL; + + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + goto _exit; + } + + TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version}; + int32_t nCol = pTSchema->numOfCols; + + ctxArray = taosArrayInit(nCol, sizeof(SLastUpdateCtx)); + iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + for (int32_t iCol = 0; iCol < nCol; ++iCol) { + tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0); + } + + for (int32_t iRow = nRow - 1; iRow >= 0; --iRow) { + if (tSimpleHashGetSize(iColHash) == 0) { + break; + } + + tRow.pTSRow = aRow[iRow]; + + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(&tRow, &tsdbRowKey); + + void *pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) { + int32_t iCol = ((int32_t *)pIte)[0]; + SColVal colVal = COL_VAL_NONE(0, 0); + tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal); + + if (COL_VAL_IS_VALUE(&colVal)) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, tsdbRowKey = tsdbRowKey, .colVal = colVal}; + taosArrayPush(ctxArray, &updateCtx); + tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter); + } + } + } + + // 2. prepare last row + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(pLRow, &tsdbRowKey); + + STSDBRowIter iter = {0}; + tsdbRowIterOpen(&iter, pLRow, pTSchema); + for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; + taosArrayPush(ctxArray, &updateCtx); + } + tsdbRowClose(&iter); + + // 3. do update + tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); + +_exit: + taosMemoryFreeClear(pTSchema); + taosArrayDestroy(ctxArray); + tSimpleHashCleanup(iColHash); + return code; +} + +int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData, TSDBROW *pLRow) { + int32_t code = 0; + + STSchema *pTSchema = NULL; + int32_t sver = TSDBROW_SVERSION(pLRow); + SArray *ctxArray = NULL; + + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + goto _exit; + } + + ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx)); + + // 1. prepare last + TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0); + + for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) { + for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) { + SColData *pColData = &pBlockData->aColData[iColData]; + if ((pColData->flag & HAS_VALUE) == 0) { + continue; + } + + SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type); + tColDataGetValue(pColData, tRow.iRow, &colVal); + if (COL_VAL_IS_VALUE(&colVal)) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .colVal = colVal}; + taosArrayPush(ctxArray, &updateCtx); + break; + } + } + } + + // 2. prepare last row + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(pLRow, &tsdbRowKey); + + STSDBRowIter iter = {0}; + tsdbRowIterOpen(&iter, pLRow, pTSchema); + for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; + taosArrayPush(ctxArray, &updateCtx); + } + tsdbRowClose(&iter); + + // 3. do update + tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); + +_exit: + taosMemoryFreeClear(pTSchema); + taosArrayDestroy(ctxArray); + return 0; +} + +// int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { +// int32_t code = 0; + +// // 1, fetch schema +// STSchema *pTSchema = NULL; +// int32_t sver = TSDBROW_SVERSION(pRow); + +// code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); +// if (code != TSDB_CODE_SUCCESS) { +// terrno = code; +// return -1; +// } + +// // 2, iterate col values into array +// SArray *aColVal = taosArrayInit(32, sizeof(SColVal)); + +// STSDBRowIter iter = {0}; +// tsdbRowIterOpen(&iter, pRow, pTSchema); + +// for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { +// taosArrayPush(aColVal, pColVal); +// } + +// tsdbRowClose(&iter); + +// // 3, build keys & multi get from rocks +// int num_keys = TARRAY_SIZE(aColVal); +// SArray *remainCols = NULL; +// SLRUCache *pCache = pTsdb->lruCache; + +// STsdbRowKey tsdbRowKey = {0}; +// tsdbRowGetKey(pRow, &tsdbRowKey); +// SRowKey *pRowKey = &tsdbRowKey.key; + +// taosThreadMutexLock(&pTsdb->lruMutex); +// for (int i = 0; i < num_keys; ++i) { +// SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); +// int16_t cid = pColVal->cid; + +// SLastKey *key = &(SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid}; +// size_t klen = ROCKS_KEY_LEN; +// LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); +// if (h) { +// SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); +// int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey); +// if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { +// tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); +// } +// taosLRUCacheRelease(pCache, h, false); +// } else { +// if (!remainCols) { +// remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey)); +// } +// taosArrayPush(remainCols, &(SIdxKey){i, *key}); +// } + +// if (COL_VAL_IS_VALUE(pColVal)) { +// key->lflag = LFLAG_LAST; +// LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); +// if (h) { +// SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); +// if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) { +// tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); +// } +// taosLRUCacheRelease(pCache, h, false); +// } else { +// if (!remainCols) { +// remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey)); +// } +// taosArrayPush(remainCols, &(SIdxKey){i, *key}); +// } +// } +// } + +// if (remainCols) { +// num_keys = TARRAY_SIZE(remainCols); +// } +// if (remainCols && num_keys > 0) { +// char **keys_list = taosMemoryCalloc(num_keys, sizeof(char *)); +// size_t *keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); +// for (int i = 0; i < num_keys; ++i) { +// SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; + +// keys_list[i] = (char *)&idxKey->key; +// keys_list_sizes[i] = ROCKS_KEY_LEN; +// } +// char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); +// size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); +// char **errs = taosMemoryCalloc(num_keys, sizeof(char *)); +// rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, +// keys_list_sizes, values_list, values_list_sizes, errs); +// for (int i = 0; i < num_keys; ++i) { +// rocksdb_free(errs[i]); +// } +// taosMemoryFree(errs); + +// rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; +// for (int i = 0; i < num_keys; ++i) { +// SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; +// SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx; +// // SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx); + +// SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]); +// SLastCol *PToFree = pLastCol; + +// if (IS_LAST_ROW_KEY(idxKey->key)) { +// int32_t cmp_res = 1; +// if (pLastCol) { +// cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey); +// } + +// if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { +// char *value = NULL; +// size_t vlen = 0; +// SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal}; +// tsdbCacheSerialize(&lastColTmp, &value, &vlen); + +// taosThreadMutexLock(&pTsdb->rCache.rMutex); + +// rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); + +// taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + +// pLastCol = &lastColTmp; +// SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); +// *pTmpLastCol = *pLastCol; +// pLastCol = pTmpLastCol; + +// size_t charge = sizeof(*pLastCol); +// for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { +// SValue *pValue = &pLastCol->rowKey.pks[i]; +// if (IS_VAR_DATA_TYPE(pValue->type)) { +// reallocVarDataVal(pValue); +// charge += pValue->nData; +// } +// } + +// if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { +// reallocVarData(&pLastCol->colVal); +// charge += pLastCol->colVal.value.nData; +// } + +// LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, +// tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); +// if (status != TAOS_LRU_STATUS_OK) { +// code = -1; +// } + +// taosMemoryFree(value); +// } +// } else { +// if (COL_VAL_IS_VALUE(pColVal)) { +// if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) { +// char *value = NULL; +// size_t vlen = 0; +// SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal}; +// tsdbCacheSerialize(&lastColTmp, &value, &vlen); + +// taosThreadMutexLock(&pTsdb->rCache.rMutex); + +// rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); + +// taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + +// pLastCol = &lastColTmp; +// SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); +// *pTmpLastCol = *pLastCol; +// pLastCol = pTmpLastCol; + +// size_t charge = sizeof(*pLastCol); +// for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { +// SValue *pValue = &pLastCol->rowKey.pks[i]; +// if (IS_VAR_DATA_TYPE(pValue->type)) { +// reallocVarDataVal(pValue); +// charge += pValue->nData; +// } +// } + +// if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { +// reallocVarData(&pLastCol->colVal); +// charge += pLastCol->colVal.value.nData; +// } + +// LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, +// tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); +// if (status != TAOS_LRU_STATUS_OK) { +// code = -1; +// } + +// taosMemoryFree(value); +// } +// } +// } + +// taosMemoryFreeClear(PToFree); +// rocksdb_free(values_list[i]); +// } + +// rocksMayWrite(pTsdb, true, false, true); + +// taosMemoryFree(keys_list); +// taosMemoryFree(keys_list_sizes); +// taosMemoryFree(values_list); +// taosMemoryFree(values_list_sizes); + +// taosArrayDestroy(remainCols); +// } + +// taosThreadMutexUnlock(&pTsdb->lruMutex); + +// _exit: +// taosArrayDestroy(aColVal); +// taosMemoryFree(pTSchema); +// return code; +// } + static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index be15a4fecf..2ebc01775f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -664,7 +664,7 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, } if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { - tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); + tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData, &lRow); } // SMemTable @@ -727,7 +727,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, pTbData->maxKey = key.key.ts; } if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { - tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); + tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, nRow, aRow, &lRow); } // SMemTable diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 60459b0d21..e91f64b369 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -601,17 +601,21 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal * STColumn *pTColumn = &pTSchema->columns[iCol]; SValue value; - ASSERT(iCol > 0); - if (pRow->type == TSDBROW_ROW_FMT) { tRowGet(pRow->pTSRow, pTSchema, iCol, pColVal); } else if (pRow->type == TSDBROW_COL_FMT) { - SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId); - - if (pColData) { - tColDataGetValue(pColData, pRow->iRow, pColVal); + if (iCol == 0) { + *pColVal = + COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, + ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pRow->pBlockData->aTSKEY[pRow->iRow]})); } else { - *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type); + SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId); + + if (pColData) { + tColDataGetValue(pColData, pRow->iRow, pColVal); + } else { + *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type); + } } } else { ASSERT(0); @@ -701,25 +705,51 @@ void tsdbRowClose(STSDBRowIter *pIter) { } } +static SColVal *tsdbRowColIterGetValue(STSDBRowIter *pIter) { + if (pIter->iColData == 0) { + pIter->cv = COL_VAL_VALUE( + PRIMARYKEY_TIMESTAMP_COL_ID, + ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pIter->pRow->pBlockData->aTSKEY[pIter->pRow->iRow]})); + return &pIter->cv; + } + + if (pIter->iColData <= pIter->pRow->pBlockData->nColData) { + tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv); + return &pIter->cv; + } else { + return NULL; + } +} + +static SColVal *tsdbRowColIterNext(STSDBRowIter *pIter) { + SColVal* pColVal = tsdbRowColIterGetValue(pIter); + if (pColVal) { + ++pIter->iColData; + } + return pColVal; +} + +static SColVal* tsdbRowColIterMoveTo(STSDBRowIter *pIter, int32_t iCol) { + pIter->iColData = iCol; + return tsdbRowColIterGetValue(pIter); +} + SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { if (pIter->pRow->type == TSDBROW_ROW_FMT) { return tRowIterNext(pIter->pIter); } else if (pIter->pRow->type == TSDBROW_COL_FMT) { - if (pIter->iColData == 0) { - pIter->cv = COL_VAL_VALUE( - PRIMARYKEY_TIMESTAMP_COL_ID, - ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pIter->pRow->pBlockData->aTSKEY[pIter->pRow->iRow]})); - ++pIter->iColData; - return &pIter->cv; - } + return tsdbRowColIterNext(pIter); + } else { + ASSERT(0); + return NULL; // suppress error report by compiler + } +} - if (pIter->iColData <= pIter->pRow->pBlockData->nColData) { - tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv); - ++pIter->iColData; - return &pIter->cv; - } else { - return NULL; - } +SColVal *tsdbRowIterMoveTo(STSDBRowIter *pIter, int32_t iCol) { + if (pIter->pRow->type == TSDBROW_ROW_FMT) { + return tRowIterMoveTo(pIter->pIter, iCol); + } else if (pIter->pRow->type == TSDBROW_COL_FMT) { + return tsdbRowColIterMoveTo(pIter, iCol); } else { ASSERT(0); return NULL; // suppress error report by compiler @@ -1820,4 +1850,4 @@ uint32_t tsdbCvtTimestampAlg(uint32_t alg) { DEFINE_VAR(alg) return 0; -} \ No newline at end of file +}