diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 071848cc0c..d54109852c 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -132,7 +132,6 @@ void tColRowGetKey(SBlockData *pBlock, int32_t irow, SRowKey *key); int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); void tsdbRowClose(STSDBRowIter *pIter); SColVal *tsdbRowIterNext(STSDBRowIter *pIter); -SColVal *tsdbRowIterMoveTo(STSDBRowIter *pIter, int32_t iCol); // SRowMerger int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pSchema); @@ -911,8 +910,8 @@ typedef struct { int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); -int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t nRow, SRow **aRow, TSDBROW *pLRow); -int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData, TSDBROW *pLRow); +int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow, SRow **aRow); +int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData); int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 1034a21487..02d4ffcf00 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1136,13 +1136,15 @@ _exit: return code; } -int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t nRow, SRow **aRow, TSDBROW *pLRow) { +int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow, + SRow **aRow) { int32_t code = 0; // 1. prepare last + TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version}; + STSchema *pTSchema = NULL; - int32_t version = TSDBROW_VERSION(pLRow); - int32_t sver = TSDBROW_SVERSION(pLRow); + int32_t sver = TSDBROW_SVERSION(&lRow); SArray *ctxArray = NULL; SSHashObj *iColHash = NULL; @@ -1179,7 +1181,7 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int3 tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal); if (COL_VAL_IS_VALUE(&colVal)) { - SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, tsdbRowKey = tsdbRowKey, .colVal = colVal}; + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal}; taosArrayPush(ctxArray, &updateCtx); tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter); } @@ -1188,12 +1190,12 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int3 // 2. prepare last row STsdbRowKey tsdbRowKey = {0}; - tsdbRowGetKey(pLRow, &tsdbRowKey); + tsdbRowGetKey(&lRow, &tsdbRowKey); STSDBRowIter iter = {0}; - tsdbRowIterOpen(&iter, pLRow, pTSchema); + tsdbRowIterOpen(&iter, &lRow, pTSchema); for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { - SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; taosArrayPush(ctxArray, &updateCtx); } tsdbRowClose(&iter); @@ -1208,11 +1210,13 @@ _exit: return code; } -int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData, TSDBROW *pLRow) { +int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) { int32_t code = 0; + TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1); + STSchema *pTSchema = NULL; - int32_t sver = TSDBROW_SVERSION(pLRow); + int32_t sver = TSDBROW_SVERSION(&lRow); SArray *ctxArray = NULL; code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); @@ -1233,10 +1237,15 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo continue; } - SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type); - tColDataGetValue(pColData, tRow.iRow, &colVal); - if (COL_VAL_IS_VALUE(&colVal)) { - SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .colVal = colVal}; + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(&tRow, &tsdbRowKey); + + uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow); + if (colType == 2) { + SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type); + tColDataGetValue(pColData, tRow.iRow, &colVal); + + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal}; taosArrayPush(ctxArray, &updateCtx); break; } @@ -1245,12 +1254,12 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo // 2. prepare last row STsdbRowKey tsdbRowKey = {0}; - tsdbRowGetKey(pLRow, &tsdbRowKey); + tsdbRowGetKey(&lRow, &tsdbRowKey); STSDBRowIter iter = {0}; - tsdbRowIterOpen(&iter, pLRow, pTSchema); + tsdbRowIterOpen(&iter, &lRow, pTSchema); for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { - SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; taosArrayPush(ctxArray, &updateCtx); } tsdbRowClose(&iter); @@ -1264,222 +1273,6 @@ _exit: return 0; } -// int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { -// int32_t code = 0; - -// // 1, fetch schema -// STSchema *pTSchema = NULL; -// int32_t sver = TSDBROW_SVERSION(pRow); - -// code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); -// if (code != TSDB_CODE_SUCCESS) { -// terrno = code; -// return -1; -// } - -// // 2, iterate col values into array -// SArray *aColVal = taosArrayInit(32, sizeof(SColVal)); - -// STSDBRowIter iter = {0}; -// tsdbRowIterOpen(&iter, pRow, pTSchema); - -// for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { -// taosArrayPush(aColVal, pColVal); -// } - -// tsdbRowClose(&iter); - -// // 3, build keys & multi get from rocks -// int num_keys = TARRAY_SIZE(aColVal); -// SArray *remainCols = NULL; -// SLRUCache *pCache = pTsdb->lruCache; - -// STsdbRowKey tsdbRowKey = {0}; -// tsdbRowGetKey(pRow, &tsdbRowKey); -// SRowKey *pRowKey = &tsdbRowKey.key; - -// taosThreadMutexLock(&pTsdb->lruMutex); -// for (int i = 0; i < num_keys; ++i) { -// SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); -// int16_t cid = pColVal->cid; - -// SLastKey *key = &(SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid}; -// size_t klen = ROCKS_KEY_LEN; -// LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); -// if (h) { -// SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); -// int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey); -// if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { -// tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); -// } -// taosLRUCacheRelease(pCache, h, false); -// } else { -// if (!remainCols) { -// remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey)); -// } -// taosArrayPush(remainCols, &(SIdxKey){i, *key}); -// } - -// if (COL_VAL_IS_VALUE(pColVal)) { -// key->lflag = LFLAG_LAST; -// LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); -// if (h) { -// SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); -// if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) { -// tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); -// } -// taosLRUCacheRelease(pCache, h, false); -// } else { -// if (!remainCols) { -// remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey)); -// } -// taosArrayPush(remainCols, &(SIdxKey){i, *key}); -// } -// } -// } - -// if (remainCols) { -// num_keys = TARRAY_SIZE(remainCols); -// } -// if (remainCols && num_keys > 0) { -// char **keys_list = taosMemoryCalloc(num_keys, sizeof(char *)); -// size_t *keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); -// for (int i = 0; i < num_keys; ++i) { -// SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; - -// keys_list[i] = (char *)&idxKey->key; -// keys_list_sizes[i] = ROCKS_KEY_LEN; -// } -// char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); -// size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); -// char **errs = taosMemoryCalloc(num_keys, sizeof(char *)); -// rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, -// keys_list_sizes, values_list, values_list_sizes, errs); -// for (int i = 0; i < num_keys; ++i) { -// rocksdb_free(errs[i]); -// } -// taosMemoryFree(errs); - -// rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; -// for (int i = 0; i < num_keys; ++i) { -// SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; -// SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx; -// // SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx); - -// SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]); -// SLastCol *PToFree = pLastCol; - -// if (IS_LAST_ROW_KEY(idxKey->key)) { -// int32_t cmp_res = 1; -// if (pLastCol) { -// cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey); -// } - -// if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { -// char *value = NULL; -// size_t vlen = 0; -// SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal}; -// tsdbCacheSerialize(&lastColTmp, &value, &vlen); - -// taosThreadMutexLock(&pTsdb->rCache.rMutex); - -// rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); - -// taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - -// pLastCol = &lastColTmp; -// SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); -// *pTmpLastCol = *pLastCol; -// pLastCol = pTmpLastCol; - -// size_t charge = sizeof(*pLastCol); -// for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { -// SValue *pValue = &pLastCol->rowKey.pks[i]; -// if (IS_VAR_DATA_TYPE(pValue->type)) { -// reallocVarDataVal(pValue); -// charge += pValue->nData; -// } -// } - -// if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { -// reallocVarData(&pLastCol->colVal); -// charge += pLastCol->colVal.value.nData; -// } - -// LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, -// tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); -// if (status != TAOS_LRU_STATUS_OK) { -// code = -1; -// } - -// taosMemoryFree(value); -// } -// } else { -// if (COL_VAL_IS_VALUE(pColVal)) { -// if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) { -// char *value = NULL; -// size_t vlen = 0; -// SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal}; -// tsdbCacheSerialize(&lastColTmp, &value, &vlen); - -// taosThreadMutexLock(&pTsdb->rCache.rMutex); - -// rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); - -// taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - -// pLastCol = &lastColTmp; -// SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); -// *pTmpLastCol = *pLastCol; -// pLastCol = pTmpLastCol; - -// size_t charge = sizeof(*pLastCol); -// for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { -// SValue *pValue = &pLastCol->rowKey.pks[i]; -// if (IS_VAR_DATA_TYPE(pValue->type)) { -// reallocVarDataVal(pValue); -// charge += pValue->nData; -// } -// } - -// if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { -// reallocVarData(&pLastCol->colVal); -// charge += pLastCol->colVal.value.nData; -// } - -// LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, -// tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); -// if (status != TAOS_LRU_STATUS_OK) { -// code = -1; -// } - -// taosMemoryFree(value); -// } -// } -// } - -// taosMemoryFreeClear(PToFree); -// rocksdb_free(values_list[i]); -// } - -// rocksMayWrite(pTsdb, true, false, true); - -// taosMemoryFree(keys_list); -// taosMemoryFree(keys_list_sizes); -// taosMemoryFree(values_list); -// taosMemoryFree(values_list_sizes); - -// taosArrayDestroy(remainCols); -// } - -// taosThreadMutexUnlock(&pTsdb->lruMutex); - -// _exit: -// taosArrayDestroy(aColVal); -// taosMemoryFree(pTSchema); -// return code; -// } - static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 2ebc01775f..8b4adef57f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -629,14 +629,12 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode *pos[SL_MAX_LEVEL]; TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0); STsdbRowKey key; - TSDBROW lRow; // last row // first row tsdbRowGetKey(&tRow, &key); tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD); if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit; pTbData->minKey = TMIN(pTbData->minKey, key.key.ts); - lRow = tRow; // remain row ++tRow.iRow; @@ -653,7 +651,6 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, } if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1))) goto _exit; - lRow = tRow; ++tRow.iRow; } @@ -664,7 +661,7 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, } if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { - tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData, &lRow); + tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData); } // SMemTable @@ -688,7 +685,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode *pos[SL_MAX_LEVEL]; TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version}; int32_t iRow = 0; - TSDBROW lRow; // backward put first data tRow.pTSRow = aRow[iRow++]; @@ -696,7 +692,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD); code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0); if (code) goto _exit; - lRow = tRow; pTbData->minKey = TMIN(pTbData->minKey, key.key.ts); @@ -717,8 +712,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1); if (code) goto _exit; - lRow = tRow; - iRow++; } } @@ -727,7 +720,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, pTbData->maxKey = key.key.ts; } if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { - tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, nRow, aRow, &lRow); + tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow); } // SMemTable diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index e91f64b369..27a532d15b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -705,51 +705,25 @@ void tsdbRowClose(STSDBRowIter *pIter) { } } -static SColVal *tsdbRowColIterGetValue(STSDBRowIter *pIter) { - if (pIter->iColData == 0) { - pIter->cv = COL_VAL_VALUE( - PRIMARYKEY_TIMESTAMP_COL_ID, - ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pIter->pRow->pBlockData->aTSKEY[pIter->pRow->iRow]})); - return &pIter->cv; - } - - if (pIter->iColData <= pIter->pRow->pBlockData->nColData) { - tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv); - return &pIter->cv; - } else { - return NULL; - } -} - -static SColVal *tsdbRowColIterNext(STSDBRowIter *pIter) { - SColVal* pColVal = tsdbRowColIterGetValue(pIter); - if (pColVal) { - ++pIter->iColData; - } - return pColVal; -} - -static SColVal* tsdbRowColIterMoveTo(STSDBRowIter *pIter, int32_t iCol) { - pIter->iColData = iCol; - return tsdbRowColIterGetValue(pIter); -} - SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { if (pIter->pRow->type == TSDBROW_ROW_FMT) { return tRowIterNext(pIter->pIter); } else if (pIter->pRow->type == TSDBROW_COL_FMT) { - return tsdbRowColIterNext(pIter); - } else { - ASSERT(0); - return NULL; // suppress error report by compiler - } -} + if (pIter->iColData == 0) { + pIter->cv = COL_VAL_VALUE( + PRIMARYKEY_TIMESTAMP_COL_ID, + ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pIter->pRow->pBlockData->aTSKEY[pIter->pRow->iRow]})); + ++pIter->iColData; + return &pIter->cv; + } -SColVal *tsdbRowIterMoveTo(STSDBRowIter *pIter, int32_t iCol) { - if (pIter->pRow->type == TSDBROW_ROW_FMT) { - return tRowIterMoveTo(pIter->pIter, iCol); - } else if (pIter->pRow->type == TSDBROW_COL_FMT) { - return tsdbRowColIterMoveTo(pIter, iCol); + if (pIter->iColData <= pIter->pRow->pBlockData->nColData) { + tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv); + ++pIter->iColData; + return &pIter->cv; + } else { + return NULL; + } } else { ASSERT(0); return NULL; // suppress error report by compiler