From fc047ddafc76eb1f3a8d5c56ff2c5d0cc867e53d Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Tue, 14 May 2024 19:04:52 +0800 Subject: [PATCH 1/4] fix: last cache update rule --- include/common/tdataformat.h | 1 + source/common/src/tdataformat.c | 19 +- source/dnode/vnode/src/inc/tsdb.h | 10 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 462 ++++++++++++++++----- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 4 +- source/dnode/vnode/src/tsdb/tsdbUtil.c | 74 +++- 6 files changed, 448 insertions(+), 122 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 49ffd8ec35..7d35010e5c 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -133,6 +133,7 @@ int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc); int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter); void tRowIterClose(SRowIter **ppIter); SColVal *tRowIterNext(SRowIter *pIter); +SColVal *tRowIterMoveTo(SRowIter *pIter, int32_t iTColum); // STag ================================ int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 04ad00e1dc..eeff53d9e0 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -774,7 +774,7 @@ void tRowIterClose(SRowIter **ppIter) { *ppIter = NULL; } -SColVal *tRowIterNext(SRowIter *pIter) { +static SColVal *tRowIterGetValue(SRowIter *pIter) { if (pIter->iTColumn >= pIter->pTSchema->numOfCols) { return NULL; } @@ -896,10 +896,25 @@ SColVal *tRowIterNext(SRowIter *pIter) { } _exit: - pIter->iTColumn++; return &pIter->cv; } +SColVal* tRowIterNext(SRowIter *pIter) { + SColVal* pColVal = tRowIterGetValue(pIter); + if (pColVal) { + pIter->iTColumn++; + } + return pColVal; +} + +SColVal *tRowIterMoveTo(SRowIter *pIter, int32_t iTColum) { + pIter->iTColumn = iTColum; + if (pIter->iTColumn < 0 || pIter->iTColumn >= pIter->pTSchema->numOfCols) { + return NULL; + } + return tRowIterGetValue(pIter); +} + static int32_t tRowNoneUpsertColData(SColData *aColData, int32_t nColData, int32_t flag) { int32_t code = 0; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 01db196479..071848cc0c 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -132,6 +132,7 @@ void tColRowGetKey(SBlockData *pBlock, int32_t irow, SRowKey *key); int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); void tsdbRowClose(STSDBRowIter *pIter); SColVal *tsdbRowIterNext(STSDBRowIter *pIter); +SColVal *tsdbRowIterMoveTo(STSDBRowIter *pIter, int32_t iCol); // SRowMerger int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pSchema); @@ -902,9 +903,16 @@ typedef struct { SColVal colVal; } SLastCol; +typedef struct { + int8_t lflag; + STsdbRowKey tsdbRowKey; + SColVal colVal; +} SLastUpdateCtx; + int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); -int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row); +int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t nRow, SRow **aRow, TSDBROW *pLRow); +int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData, TSDBROW *pLRow); int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 6f26e3a63f..1034a21487 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -992,46 +992,30 @@ static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal } } -int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { +static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) { + if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) { + return 0; + } + int32_t code = 0; - // 1, fetch schema - STSchema *pTSchema = NULL; - int32_t sver = TSDBROW_SVERSION(pRow); - - code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return -1; - } - - // 2, iterate col values into array - SArray *aColVal = taosArrayInit(32, sizeof(SColVal)); - - STSDBRowIter iter = {0}; - tsdbRowIterOpen(&iter, pRow, pTSchema); - - for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { - taosArrayPush(aColVal, pColVal); - } - - tsdbRowClose(&iter); - - // 3, build keys & multi get from rocks - int num_keys = TARRAY_SIZE(aColVal); + int num_keys = TARRAY_SIZE(updCtxArray); SArray *remainCols = NULL; SLRUCache *pCache = pTsdb->lruCache; - STsdbRowKey tsdbRowKey = {0}; - tsdbRowGetKey(pRow, &tsdbRowKey); - SRowKey *pRowKey = &tsdbRowKey.key; - taosThreadMutexLock(&pTsdb->lruMutex); for (int i = 0; i < num_keys; ++i) { - SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); - int16_t cid = pColVal->cid; + SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, i); - SLastKey *key = &(SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid}; + int8_t lflag = updCtx->lflag; + SRowKey *pRowKey = &updCtx->tsdbRowKey.key; + SColVal *pColVal = &updCtx->colVal; + + if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) { + continue; + } + + SLastKey *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid}; size_t klen = ROCKS_KEY_LEN; LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); if (h) { @@ -1047,23 +1031,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow } taosArrayPush(remainCols, &(SIdxKey){i, *key}); } - - if (COL_VAL_IS_VALUE(pColVal)) { - key->lflag = LFLAG_LAST; - LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); - if (h) { - SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) { - tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); - } - taosLRUCacheRelease(pCache, h, false); - } else { - if (!remainCols) { - remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey)); - } - taosArrayPush(remainCols, &(SIdxKey){i, *key}); - } - } } if (remainCols) { @@ -1091,12 +1058,17 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; for (int i = 0; i < num_keys; ++i) { SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; - SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx; - // SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx); + SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, i); + SRowKey *pRowKey = &updCtx->tsdbRowKey.key; + SColVal *pColVal = &updCtx->colVal; SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]); SLastCol *PToFree = pLastCol; + if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) { + continue; + } + if (IS_LAST_ROW_KEY(idxKey->key)) { int32_t cmp_res = 1; if (pLastCol) { @@ -1142,48 +1114,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosMemoryFree(value); } - } else { - if (COL_VAL_IS_VALUE(pColVal)) { - if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) { - char *value = NULL; - size_t vlen = 0; - SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal}; - tsdbCacheSerialize(&lastColTmp, &value, &vlen); - - taosThreadMutexLock(&pTsdb->rCache.rMutex); - - rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); - - taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - - pLastCol = &lastColTmp; - SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); - *pTmpLastCol = *pLastCol; - pLastCol = pTmpLastCol; - - size_t charge = sizeof(*pLastCol); - for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { - SValue *pValue = &pLastCol->rowKey.pks[i]; - if (IS_VAR_DATA_TYPE(pValue->type)) { - reallocVarDataVal(pValue); - charge += pValue->nData; - } - } - - if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { - reallocVarData(&pLastCol->colVal); - charge += pLastCol->colVal.value.nData; - } - - LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, - tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); - if (status != TAOS_LRU_STATUS_OK) { - code = -1; - } - - taosMemoryFree(value); - } - } } taosMemoryFreeClear(PToFree); @@ -1203,11 +1133,353 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosThreadMutexUnlock(&pTsdb->lruMutex); _exit: - taosArrayDestroy(aColVal); - taosMemoryFree(pTSchema); return code; } +int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t nRow, SRow **aRow, TSDBROW *pLRow) { + int32_t code = 0; + + // 1. prepare last + STSchema *pTSchema = NULL; + int32_t version = TSDBROW_VERSION(pLRow); + int32_t sver = TSDBROW_SVERSION(pLRow); + SArray *ctxArray = NULL; + SSHashObj *iColHash = NULL; + + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + goto _exit; + } + + TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version}; + int32_t nCol = pTSchema->numOfCols; + + ctxArray = taosArrayInit(nCol, sizeof(SLastUpdateCtx)); + iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + for (int32_t iCol = 0; iCol < nCol; ++iCol) { + tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0); + } + + for (int32_t iRow = nRow - 1; iRow >= 0; --iRow) { + if (tSimpleHashGetSize(iColHash) == 0) { + break; + } + + tRow.pTSRow = aRow[iRow]; + + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(&tRow, &tsdbRowKey); + + void *pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) { + int32_t iCol = ((int32_t *)pIte)[0]; + SColVal colVal = COL_VAL_NONE(0, 0); + tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal); + + if (COL_VAL_IS_VALUE(&colVal)) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, tsdbRowKey = tsdbRowKey, .colVal = colVal}; + taosArrayPush(ctxArray, &updateCtx); + tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter); + } + } + } + + // 2. prepare last row + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(pLRow, &tsdbRowKey); + + STSDBRowIter iter = {0}; + tsdbRowIterOpen(&iter, pLRow, pTSchema); + for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; + taosArrayPush(ctxArray, &updateCtx); + } + tsdbRowClose(&iter); + + // 3. do update + tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); + +_exit: + taosMemoryFreeClear(pTSchema); + taosArrayDestroy(ctxArray); + tSimpleHashCleanup(iColHash); + return code; +} + +int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData, TSDBROW *pLRow) { + int32_t code = 0; + + STSchema *pTSchema = NULL; + int32_t sver = TSDBROW_SVERSION(pLRow); + SArray *ctxArray = NULL; + + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + goto _exit; + } + + ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx)); + + // 1. prepare last + TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0); + + for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) { + for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) { + SColData *pColData = &pBlockData->aColData[iColData]; + if ((pColData->flag & HAS_VALUE) == 0) { + continue; + } + + SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type); + tColDataGetValue(pColData, tRow.iRow, &colVal); + if (COL_VAL_IS_VALUE(&colVal)) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .colVal = colVal}; + taosArrayPush(ctxArray, &updateCtx); + break; + } + } + } + + // 2. prepare last row + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(pLRow, &tsdbRowKey); + + STSDBRowIter iter = {0}; + tsdbRowIterOpen(&iter, pLRow, pTSchema); + for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; + taosArrayPush(ctxArray, &updateCtx); + } + tsdbRowClose(&iter); + + // 3. do update + tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); + +_exit: + taosMemoryFreeClear(pTSchema); + taosArrayDestroy(ctxArray); + return 0; +} + +// int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { +// int32_t code = 0; + +// // 1, fetch schema +// STSchema *pTSchema = NULL; +// int32_t sver = TSDBROW_SVERSION(pRow); + +// code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); +// if (code != TSDB_CODE_SUCCESS) { +// terrno = code; +// return -1; +// } + +// // 2, iterate col values into array +// SArray *aColVal = taosArrayInit(32, sizeof(SColVal)); + +// STSDBRowIter iter = {0}; +// tsdbRowIterOpen(&iter, pRow, pTSchema); + +// for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { +// taosArrayPush(aColVal, pColVal); +// } + +// tsdbRowClose(&iter); + +// // 3, build keys & multi get from rocks +// int num_keys = TARRAY_SIZE(aColVal); +// SArray *remainCols = NULL; +// SLRUCache *pCache = pTsdb->lruCache; + +// STsdbRowKey tsdbRowKey = {0}; +// tsdbRowGetKey(pRow, &tsdbRowKey); +// SRowKey *pRowKey = &tsdbRowKey.key; + +// taosThreadMutexLock(&pTsdb->lruMutex); +// for (int i = 0; i < num_keys; ++i) { +// SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); +// int16_t cid = pColVal->cid; + +// SLastKey *key = &(SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid}; +// size_t klen = ROCKS_KEY_LEN; +// LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); +// if (h) { +// SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); +// int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey); +// if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { +// tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); +// } +// taosLRUCacheRelease(pCache, h, false); +// } else { +// if (!remainCols) { +// remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey)); +// } +// taosArrayPush(remainCols, &(SIdxKey){i, *key}); +// } + +// if (COL_VAL_IS_VALUE(pColVal)) { +// key->lflag = LFLAG_LAST; +// LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); +// if (h) { +// SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); +// if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) { +// tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); +// } +// taosLRUCacheRelease(pCache, h, false); +// } else { +// if (!remainCols) { +// remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey)); +// } +// taosArrayPush(remainCols, &(SIdxKey){i, *key}); +// } +// } +// } + +// if (remainCols) { +// num_keys = TARRAY_SIZE(remainCols); +// } +// if (remainCols && num_keys > 0) { +// char **keys_list = taosMemoryCalloc(num_keys, sizeof(char *)); +// size_t *keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); +// for (int i = 0; i < num_keys; ++i) { +// SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; + +// keys_list[i] = (char *)&idxKey->key; +// keys_list_sizes[i] = ROCKS_KEY_LEN; +// } +// char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); +// size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); +// char **errs = taosMemoryCalloc(num_keys, sizeof(char *)); +// rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, +// keys_list_sizes, values_list, values_list_sizes, errs); +// for (int i = 0; i < num_keys; ++i) { +// rocksdb_free(errs[i]); +// } +// taosMemoryFree(errs); + +// rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; +// for (int i = 0; i < num_keys; ++i) { +// SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; +// SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx; +// // SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx); + +// SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]); +// SLastCol *PToFree = pLastCol; + +// if (IS_LAST_ROW_KEY(idxKey->key)) { +// int32_t cmp_res = 1; +// if (pLastCol) { +// cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey); +// } + +// if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { +// char *value = NULL; +// size_t vlen = 0; +// SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal}; +// tsdbCacheSerialize(&lastColTmp, &value, &vlen); + +// taosThreadMutexLock(&pTsdb->rCache.rMutex); + +// rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); + +// taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + +// pLastCol = &lastColTmp; +// SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); +// *pTmpLastCol = *pLastCol; +// pLastCol = pTmpLastCol; + +// size_t charge = sizeof(*pLastCol); +// for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { +// SValue *pValue = &pLastCol->rowKey.pks[i]; +// if (IS_VAR_DATA_TYPE(pValue->type)) { +// reallocVarDataVal(pValue); +// charge += pValue->nData; +// } +// } + +// if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { +// reallocVarData(&pLastCol->colVal); +// charge += pLastCol->colVal.value.nData; +// } + +// LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, +// tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); +// if (status != TAOS_LRU_STATUS_OK) { +// code = -1; +// } + +// taosMemoryFree(value); +// } +// } else { +// if (COL_VAL_IS_VALUE(pColVal)) { +// if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) { +// char *value = NULL; +// size_t vlen = 0; +// SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal}; +// tsdbCacheSerialize(&lastColTmp, &value, &vlen); + +// taosThreadMutexLock(&pTsdb->rCache.rMutex); + +// rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); + +// taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + +// pLastCol = &lastColTmp; +// SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); +// *pTmpLastCol = *pLastCol; +// pLastCol = pTmpLastCol; + +// size_t charge = sizeof(*pLastCol); +// for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { +// SValue *pValue = &pLastCol->rowKey.pks[i]; +// if (IS_VAR_DATA_TYPE(pValue->type)) { +// reallocVarDataVal(pValue); +// charge += pValue->nData; +// } +// } + +// if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { +// reallocVarData(&pLastCol->colVal); +// charge += pLastCol->colVal.value.nData; +// } + +// LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, +// tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); +// if (status != TAOS_LRU_STATUS_OK) { +// code = -1; +// } + +// taosMemoryFree(value); +// } +// } +// } + +// taosMemoryFreeClear(PToFree); +// rocksdb_free(values_list[i]); +// } + +// rocksMayWrite(pTsdb, true, false, true); + +// taosMemoryFree(keys_list); +// taosMemoryFree(keys_list_sizes); +// taosMemoryFree(values_list); +// taosMemoryFree(values_list_sizes); + +// taosArrayDestroy(remainCols); +// } + +// taosThreadMutexUnlock(&pTsdb->lruMutex); + +// _exit: +// taosArrayDestroy(aColVal); +// taosMemoryFree(pTSchema); +// return code; +// } + static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index be15a4fecf..2ebc01775f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -664,7 +664,7 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, } if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { - tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); + tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData, &lRow); } // SMemTable @@ -727,7 +727,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, pTbData->maxKey = key.key.ts; } if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { - tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); + tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, nRow, aRow, &lRow); } // SMemTable diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 60459b0d21..e91f64b369 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -601,17 +601,21 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal * STColumn *pTColumn = &pTSchema->columns[iCol]; SValue value; - ASSERT(iCol > 0); - if (pRow->type == TSDBROW_ROW_FMT) { tRowGet(pRow->pTSRow, pTSchema, iCol, pColVal); } else if (pRow->type == TSDBROW_COL_FMT) { - SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId); - - if (pColData) { - tColDataGetValue(pColData, pRow->iRow, pColVal); + if (iCol == 0) { + *pColVal = + COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, + ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pRow->pBlockData->aTSKEY[pRow->iRow]})); } else { - *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type); + SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId); + + if (pColData) { + tColDataGetValue(pColData, pRow->iRow, pColVal); + } else { + *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type); + } } } else { ASSERT(0); @@ -701,25 +705,51 @@ void tsdbRowClose(STSDBRowIter *pIter) { } } +static SColVal *tsdbRowColIterGetValue(STSDBRowIter *pIter) { + if (pIter->iColData == 0) { + pIter->cv = COL_VAL_VALUE( + PRIMARYKEY_TIMESTAMP_COL_ID, + ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pIter->pRow->pBlockData->aTSKEY[pIter->pRow->iRow]})); + return &pIter->cv; + } + + if (pIter->iColData <= pIter->pRow->pBlockData->nColData) { + tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv); + return &pIter->cv; + } else { + return NULL; + } +} + +static SColVal *tsdbRowColIterNext(STSDBRowIter *pIter) { + SColVal* pColVal = tsdbRowColIterGetValue(pIter); + if (pColVal) { + ++pIter->iColData; + } + return pColVal; +} + +static SColVal* tsdbRowColIterMoveTo(STSDBRowIter *pIter, int32_t iCol) { + pIter->iColData = iCol; + return tsdbRowColIterGetValue(pIter); +} + SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { if (pIter->pRow->type == TSDBROW_ROW_FMT) { return tRowIterNext(pIter->pIter); } else if (pIter->pRow->type == TSDBROW_COL_FMT) { - if (pIter->iColData == 0) { - pIter->cv = COL_VAL_VALUE( - PRIMARYKEY_TIMESTAMP_COL_ID, - ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pIter->pRow->pBlockData->aTSKEY[pIter->pRow->iRow]})); - ++pIter->iColData; - return &pIter->cv; - } + return tsdbRowColIterNext(pIter); + } else { + ASSERT(0); + return NULL; // suppress error report by compiler + } +} - if (pIter->iColData <= pIter->pRow->pBlockData->nColData) { - tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv); - ++pIter->iColData; - return &pIter->cv; - } else { - return NULL; - } +SColVal *tsdbRowIterMoveTo(STSDBRowIter *pIter, int32_t iCol) { + if (pIter->pRow->type == TSDBROW_ROW_FMT) { + return tRowIterMoveTo(pIter->pIter, iCol); + } else if (pIter->pRow->type == TSDBROW_COL_FMT) { + return tsdbRowColIterMoveTo(pIter, iCol); } else { ASSERT(0); return NULL; // suppress error report by compiler @@ -1820,4 +1850,4 @@ uint32_t tsdbCvtTimestampAlg(uint32_t alg) { DEFINE_VAR(alg) return 0; -} \ No newline at end of file +} From 5c952fc92263ee0810ccc349310d235e95e72814 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Fri, 17 May 2024 09:10:35 +0800 Subject: [PATCH 2/4] enh: cleanup --- source/dnode/vnode/src/inc/tsdb.h | 5 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 257 ++------------------- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 11 +- source/dnode/vnode/src/tsdb/tsdbUtil.c | 54 ++--- 4 files changed, 43 insertions(+), 284 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 071848cc0c..d54109852c 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -132,7 +132,6 @@ void tColRowGetKey(SBlockData *pBlock, int32_t irow, SRowKey *key); int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); void tsdbRowClose(STSDBRowIter *pIter); SColVal *tsdbRowIterNext(STSDBRowIter *pIter); -SColVal *tsdbRowIterMoveTo(STSDBRowIter *pIter, int32_t iCol); // SRowMerger int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pSchema); @@ -911,8 +910,8 @@ typedef struct { int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); -int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t nRow, SRow **aRow, TSDBROW *pLRow); -int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData, TSDBROW *pLRow); +int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow, SRow **aRow); +int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData); int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 1034a21487..02d4ffcf00 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1136,13 +1136,15 @@ _exit: return code; } -int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t nRow, SRow **aRow, TSDBROW *pLRow) { +int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow, + SRow **aRow) { int32_t code = 0; // 1. prepare last + TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version}; + STSchema *pTSchema = NULL; - int32_t version = TSDBROW_VERSION(pLRow); - int32_t sver = TSDBROW_SVERSION(pLRow); + int32_t sver = TSDBROW_SVERSION(&lRow); SArray *ctxArray = NULL; SSHashObj *iColHash = NULL; @@ -1179,7 +1181,7 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int3 tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal); if (COL_VAL_IS_VALUE(&colVal)) { - SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, tsdbRowKey = tsdbRowKey, .colVal = colVal}; + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal}; taosArrayPush(ctxArray, &updateCtx); tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter); } @@ -1188,12 +1190,12 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int3 // 2. prepare last row STsdbRowKey tsdbRowKey = {0}; - tsdbRowGetKey(pLRow, &tsdbRowKey); + tsdbRowGetKey(&lRow, &tsdbRowKey); STSDBRowIter iter = {0}; - tsdbRowIterOpen(&iter, pLRow, pTSchema); + tsdbRowIterOpen(&iter, &lRow, pTSchema); for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { - SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; taosArrayPush(ctxArray, &updateCtx); } tsdbRowClose(&iter); @@ -1208,11 +1210,13 @@ _exit: return code; } -int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData, TSDBROW *pLRow) { +int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) { int32_t code = 0; + TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1); + STSchema *pTSchema = NULL; - int32_t sver = TSDBROW_SVERSION(pLRow); + int32_t sver = TSDBROW_SVERSION(&lRow); SArray *ctxArray = NULL; code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); @@ -1233,10 +1237,15 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo continue; } - SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type); - tColDataGetValue(pColData, tRow.iRow, &colVal); - if (COL_VAL_IS_VALUE(&colVal)) { - SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .colVal = colVal}; + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(&tRow, &tsdbRowKey); + + uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow); + if (colType == 2) { + SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type); + tColDataGetValue(pColData, tRow.iRow, &colVal); + + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal}; taosArrayPush(ctxArray, &updateCtx); break; } @@ -1245,12 +1254,12 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo // 2. prepare last row STsdbRowKey tsdbRowKey = {0}; - tsdbRowGetKey(pLRow, &tsdbRowKey); + tsdbRowGetKey(&lRow, &tsdbRowKey); STSDBRowIter iter = {0}; - tsdbRowIterOpen(&iter, pLRow, pTSchema); + tsdbRowIterOpen(&iter, &lRow, pTSchema); for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { - SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; taosArrayPush(ctxArray, &updateCtx); } tsdbRowClose(&iter); @@ -1264,222 +1273,6 @@ _exit: return 0; } -// int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { -// int32_t code = 0; - -// // 1, fetch schema -// STSchema *pTSchema = NULL; -// int32_t sver = TSDBROW_SVERSION(pRow); - -// code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); -// if (code != TSDB_CODE_SUCCESS) { -// terrno = code; -// return -1; -// } - -// // 2, iterate col values into array -// SArray *aColVal = taosArrayInit(32, sizeof(SColVal)); - -// STSDBRowIter iter = {0}; -// tsdbRowIterOpen(&iter, pRow, pTSchema); - -// for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { -// taosArrayPush(aColVal, pColVal); -// } - -// tsdbRowClose(&iter); - -// // 3, build keys & multi get from rocks -// int num_keys = TARRAY_SIZE(aColVal); -// SArray *remainCols = NULL; -// SLRUCache *pCache = pTsdb->lruCache; - -// STsdbRowKey tsdbRowKey = {0}; -// tsdbRowGetKey(pRow, &tsdbRowKey); -// SRowKey *pRowKey = &tsdbRowKey.key; - -// taosThreadMutexLock(&pTsdb->lruMutex); -// for (int i = 0; i < num_keys; ++i) { -// SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); -// int16_t cid = pColVal->cid; - -// SLastKey *key = &(SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid}; -// size_t klen = ROCKS_KEY_LEN; -// LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); -// if (h) { -// SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); -// int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey); -// if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { -// tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); -// } -// taosLRUCacheRelease(pCache, h, false); -// } else { -// if (!remainCols) { -// remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey)); -// } -// taosArrayPush(remainCols, &(SIdxKey){i, *key}); -// } - -// if (COL_VAL_IS_VALUE(pColVal)) { -// key->lflag = LFLAG_LAST; -// LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); -// if (h) { -// SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); -// if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) { -// tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); -// } -// taosLRUCacheRelease(pCache, h, false); -// } else { -// if (!remainCols) { -// remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey)); -// } -// taosArrayPush(remainCols, &(SIdxKey){i, *key}); -// } -// } -// } - -// if (remainCols) { -// num_keys = TARRAY_SIZE(remainCols); -// } -// if (remainCols && num_keys > 0) { -// char **keys_list = taosMemoryCalloc(num_keys, sizeof(char *)); -// size_t *keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); -// for (int i = 0; i < num_keys; ++i) { -// SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; - -// keys_list[i] = (char *)&idxKey->key; -// keys_list_sizes[i] = ROCKS_KEY_LEN; -// } -// char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); -// size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); -// char **errs = taosMemoryCalloc(num_keys, sizeof(char *)); -// rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, -// keys_list_sizes, values_list, values_list_sizes, errs); -// for (int i = 0; i < num_keys; ++i) { -// rocksdb_free(errs[i]); -// } -// taosMemoryFree(errs); - -// rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; -// for (int i = 0; i < num_keys; ++i) { -// SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; -// SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx; -// // SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx); - -// SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]); -// SLastCol *PToFree = pLastCol; - -// if (IS_LAST_ROW_KEY(idxKey->key)) { -// int32_t cmp_res = 1; -// if (pLastCol) { -// cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey); -// } - -// if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { -// char *value = NULL; -// size_t vlen = 0; -// SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal}; -// tsdbCacheSerialize(&lastColTmp, &value, &vlen); - -// taosThreadMutexLock(&pTsdb->rCache.rMutex); - -// rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); - -// taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - -// pLastCol = &lastColTmp; -// SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); -// *pTmpLastCol = *pLastCol; -// pLastCol = pTmpLastCol; - -// size_t charge = sizeof(*pLastCol); -// for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { -// SValue *pValue = &pLastCol->rowKey.pks[i]; -// if (IS_VAR_DATA_TYPE(pValue->type)) { -// reallocVarDataVal(pValue); -// charge += pValue->nData; -// } -// } - -// if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { -// reallocVarData(&pLastCol->colVal); -// charge += pLastCol->colVal.value.nData; -// } - -// LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, -// tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); -// if (status != TAOS_LRU_STATUS_OK) { -// code = -1; -// } - -// taosMemoryFree(value); -// } -// } else { -// if (COL_VAL_IS_VALUE(pColVal)) { -// if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) { -// char *value = NULL; -// size_t vlen = 0; -// SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal}; -// tsdbCacheSerialize(&lastColTmp, &value, &vlen); - -// taosThreadMutexLock(&pTsdb->rCache.rMutex); - -// rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); - -// taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - -// pLastCol = &lastColTmp; -// SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); -// *pTmpLastCol = *pLastCol; -// pLastCol = pTmpLastCol; - -// size_t charge = sizeof(*pLastCol); -// for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { -// SValue *pValue = &pLastCol->rowKey.pks[i]; -// if (IS_VAR_DATA_TYPE(pValue->type)) { -// reallocVarDataVal(pValue); -// charge += pValue->nData; -// } -// } - -// if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { -// reallocVarData(&pLastCol->colVal); -// charge += pLastCol->colVal.value.nData; -// } - -// LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, -// tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); -// if (status != TAOS_LRU_STATUS_OK) { -// code = -1; -// } - -// taosMemoryFree(value); -// } -// } -// } - -// taosMemoryFreeClear(PToFree); -// rocksdb_free(values_list[i]); -// } - -// rocksMayWrite(pTsdb, true, false, true); - -// taosMemoryFree(keys_list); -// taosMemoryFree(keys_list_sizes); -// taosMemoryFree(values_list); -// taosMemoryFree(values_list_sizes); - -// taosArrayDestroy(remainCols); -// } - -// taosThreadMutexUnlock(&pTsdb->lruMutex); - -// _exit: -// taosArrayDestroy(aColVal); -// taosMemoryFree(pTSchema); -// return code; -// } - static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 2ebc01775f..8b4adef57f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -629,14 +629,12 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode *pos[SL_MAX_LEVEL]; TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0); STsdbRowKey key; - TSDBROW lRow; // last row // first row tsdbRowGetKey(&tRow, &key); tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD); if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit; pTbData->minKey = TMIN(pTbData->minKey, key.key.ts); - lRow = tRow; // remain row ++tRow.iRow; @@ -653,7 +651,6 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, } if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1))) goto _exit; - lRow = tRow; ++tRow.iRow; } @@ -664,7 +661,7 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, } if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { - tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData, &lRow); + tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData); } // SMemTable @@ -688,7 +685,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode *pos[SL_MAX_LEVEL]; TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version}; int32_t iRow = 0; - TSDBROW lRow; // backward put first data tRow.pTSRow = aRow[iRow++]; @@ -696,7 +692,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD); code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0); if (code) goto _exit; - lRow = tRow; pTbData->minKey = TMIN(pTbData->minKey, key.key.ts); @@ -717,8 +712,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1); if (code) goto _exit; - lRow = tRow; - iRow++; } } @@ -727,7 +720,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, pTbData->maxKey = key.key.ts; } if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { - tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, nRow, aRow, &lRow); + tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow); } // SMemTable diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index e91f64b369..27a532d15b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -705,51 +705,25 @@ void tsdbRowClose(STSDBRowIter *pIter) { } } -static SColVal *tsdbRowColIterGetValue(STSDBRowIter *pIter) { - if (pIter->iColData == 0) { - pIter->cv = COL_VAL_VALUE( - PRIMARYKEY_TIMESTAMP_COL_ID, - ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pIter->pRow->pBlockData->aTSKEY[pIter->pRow->iRow]})); - return &pIter->cv; - } - - if (pIter->iColData <= pIter->pRow->pBlockData->nColData) { - tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv); - return &pIter->cv; - } else { - return NULL; - } -} - -static SColVal *tsdbRowColIterNext(STSDBRowIter *pIter) { - SColVal* pColVal = tsdbRowColIterGetValue(pIter); - if (pColVal) { - ++pIter->iColData; - } - return pColVal; -} - -static SColVal* tsdbRowColIterMoveTo(STSDBRowIter *pIter, int32_t iCol) { - pIter->iColData = iCol; - return tsdbRowColIterGetValue(pIter); -} - SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { if (pIter->pRow->type == TSDBROW_ROW_FMT) { return tRowIterNext(pIter->pIter); } else if (pIter->pRow->type == TSDBROW_COL_FMT) { - return tsdbRowColIterNext(pIter); - } else { - ASSERT(0); - return NULL; // suppress error report by compiler - } -} + if (pIter->iColData == 0) { + pIter->cv = COL_VAL_VALUE( + PRIMARYKEY_TIMESTAMP_COL_ID, + ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pIter->pRow->pBlockData->aTSKEY[pIter->pRow->iRow]})); + ++pIter->iColData; + return &pIter->cv; + } -SColVal *tsdbRowIterMoveTo(STSDBRowIter *pIter, int32_t iCol) { - if (pIter->pRow->type == TSDBROW_ROW_FMT) { - return tRowIterMoveTo(pIter->pIter, iCol); - } else if (pIter->pRow->type == TSDBROW_COL_FMT) { - return tsdbRowColIterMoveTo(pIter, iCol); + if (pIter->iColData <= pIter->pRow->pBlockData->nColData) { + tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv); + ++pIter->iColData; + return &pIter->cv; + } else { + return NULL; + } } else { ASSERT(0); return NULL; // suppress error report by compiler From b79dda6689ebb1cbb594ac16aa0c55452eed6b48 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Fri, 17 May 2024 11:29:20 +0800 Subject: [PATCH 3/4] enh: adjust order for row format cache update --- source/dnode/vnode/src/tsdb/tsdbCache.c | 48 ++++++++++++++----------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index db099d988f..7237b5002c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1165,11 +1165,29 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 ctxArray = taosArrayInit(nCol, sizeof(SLastUpdateCtx)); iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); - for (int32_t iCol = 0; iCol < nCol; ++iCol) { - tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0); - } - for (int32_t iRow = nRow - 1; iRow >= 0; --iRow) { + // 1. prepare by lrow + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(&lRow, &tsdbRowKey); + + STSDBRowIter iter = {0}; + tsdbRowIterOpen(&iter, &lRow, pTSchema); + int32_t iCol = 0; + for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; + taosArrayPush(ctxArray, &updateCtx); + + if (!COL_VAL_IS_VALUE(pColVal)) { + tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0); + continue; + } + updateCtx.lflag = LFLAG_LAST; + taosArrayPush(ctxArray, &updateCtx); + } + tsdbRowClose(&iter); + + // 2. prepare by the other rows + for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) { if (tSimpleHashGetSize(iColHash) == 0) { break; } @@ -1194,18 +1212,6 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 } } - // 2. prepare last row - STsdbRowKey tsdbRowKey = {0}; - tsdbRowGetKey(&lRow, &tsdbRowKey); - - STSDBRowIter iter = {0}; - tsdbRowIterOpen(&iter, &lRow, pTSchema); - for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { - SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; - taosArrayPush(ctxArray, &updateCtx); - } - tsdbRowClose(&iter); - // 3. do update tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); @@ -1237,12 +1243,12 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0); for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) { - for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) { - SColData *pColData = &pBlockData->aColData[iColData]; - if ((pColData->flag & HAS_VALUE) == 0) { - continue; - } + SColData *pColData = &pBlockData->aColData[iColData]; + if ((pColData->flag & HAS_VALUE) != HAS_VALUE) { + continue; + } + for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) { STsdbRowKey tsdbRowKey = {0}; tsdbRowGetKey(&tRow, &tsdbRowKey); From 9367be016dae6c7ba297c0c8dbebc4f6e38f18b8 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Fri, 17 May 2024 11:37:30 +0800 Subject: [PATCH 4/4] enh: remove unused code --- include/common/tdataformat.h | 1 - source/common/src/tdataformat.c | 19 ++----------------- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 4a978dd34c..ce9b95522a 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -142,7 +142,6 @@ int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc); int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter); void tRowIterClose(SRowIter **ppIter); SColVal *tRowIterNext(SRowIter *pIter); -SColVal *tRowIterMoveTo(SRowIter *pIter, int32_t iTColum); // STag ================================ int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 48ab4f0d4e..ae3fe6a2a0 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -774,7 +774,7 @@ void tRowIterClose(SRowIter **ppIter) { *ppIter = NULL; } -static SColVal *tRowIterGetValue(SRowIter *pIter) { +SColVal *tRowIterNext(SRowIter *pIter) { if (pIter->iTColumn >= pIter->pTSchema->numOfCols) { return NULL; } @@ -896,25 +896,10 @@ static SColVal *tRowIterGetValue(SRowIter *pIter) { } _exit: + pIter->iTColumn++; return &pIter->cv; } -SColVal* tRowIterNext(SRowIter *pIter) { - SColVal* pColVal = tRowIterGetValue(pIter); - if (pColVal) { - pIter->iTColumn++; - } - return pColVal; -} - -SColVal *tRowIterMoveTo(SRowIter *pIter, int32_t iTColum) { - pIter->iTColumn = iTColum; - if (pIter->iTColumn < 0 || pIter->iTColumn >= pIter->pTSchema->numOfCols) { - return NULL; - } - return tRowIterGetValue(pIter); -} - static int32_t tRowNoneUpsertColData(SColData *aColData, int32_t nColData, int32_t flag) { int32_t code = 0;