From b649a73a19d52004866cbe06435cf2956779cac4 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Wed, 3 Apr 2024 09:34:56 +0800 Subject: [PATCH] feat: pk insert/drop last cache --- source/dnode/vnode/src/inc/tsdb.h | 8 +- source/dnode/vnode/src/inc/vnodeInt.h | 4 +- source/dnode/vnode/src/meta/metaTable.c | 16 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 367 +++++++++++++++------ source/dnode/vnode/src/tsdb/tsdbMemTable.c | 12 +- 5 files changed, 279 insertions(+), 128 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a574583561..a082d33a02 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -895,11 +895,17 @@ typedef enum { } EExecMode; typedef struct { - TSKEY ts; + SRowKey rowKey; int8_t dirty; SColVal colVal; } SLastCol; +typedef struct { + TSKEY ts; + int8_t dirty; + SColVal colVal; +} SLastColV1; + int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d564c5a36e..30b7e685a1 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -234,9 +234,9 @@ int32_t tsdbCacheNewTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapp int32_t tsdbCacheDropTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow); int32_t tsdbCacheDropSubTables(STsdb* pTsdb, SArray* uids, tb_uid_t suid); int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type); -int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type); +int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, bool hasPrimayKey); int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type); -int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type); +int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 3c032f193a..17adf80f06 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -449,18 +449,20 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { tsdbCacheNewSTableColumn(pTsdb, uids, cid, col_type); } else if (deltaCol == -1) { int16_t cid = -1; - int8_t col_type = -1; + bool hasPrimaryKey = false; + if (onCols >= 2) { + hasPrimaryKey = (oStbEntry.stbEntry.schemaRow.pSchema[1].flags & COL_IS_KEY) ? true : false; + } for (int i = 0, j = 0; i < nCols && j < onCols; ++i, ++j) { if (pReq->schemaRow.pSchema[i].colId != oStbEntry.stbEntry.schemaRow.pSchema[j].colId) { cid = oStbEntry.stbEntry.schemaRow.pSchema[j].colId; - col_type = oStbEntry.stbEntry.schemaRow.pSchema[j].type; break; } } if (cid != -1) { metaGetSubtables(pMeta, pReq->suid, uids); - tsdbCacheDropSTableColumn(pTsdb, uids, cid, col_type); + tsdbCacheDropSTableColumn(pTsdb, uids, cid, hasPrimaryKey); } } if (uids) taosArrayDestroy(uids); @@ -1478,6 +1480,11 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl terrno = TSDB_CODE_VND_COL_SUBSCRIBED; goto _err; } + bool hasPrimayKey = false; + if (pSchema->nCols >= 2) { + hasPrimayKey = pSchema->pSchema[1].flags & COL_IS_KEY ? true : false; + } + pSchema->version++; tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema); if (tlen) { @@ -1489,9 +1496,8 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { int16_t cid = pColumn->colId; - int8_t col_type = pColumn->type; - (void)tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type); + (void)tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, hasPrimayKey); } break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index c293b63f5d..977ef177d2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -127,12 +127,25 @@ static void tsdbClosePgCache(STsdb *pTsdb) { #define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t)) +enum { + LFLAG_LAST_ROW = 0, + LFLAG_LAST = 1, + LFLAG_PRIMARY_KEY = (1 << 4), +}; + typedef struct { tb_uid_t uid; int16_t cid; - int8_t ltype; + int8_t lflag; } SLastKey; +#define LAST_COL_VERSION_BASE (((int64_t)(0x1)) << 63) +#define LAST_COL_VERSION (LAST_COL_VERSION_BASE + 2) + +#define HAS_PRIMARY_KEY(k) (((k).lflag & LFLAG_PRIMARY_KEY) == LFLAG_PRIMARY_KEY) +#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW) +#define IS_LAST_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST) + static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { SVnode *pVnode = pTsdb->pVnode; vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN); @@ -167,9 +180,9 @@ static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t return 1; } - if (lhs->ltype < rhs->ltype) { + if (lhs->lflag < rhs->lflag) { return -1; - } else if (lhs->ltype > rhs->ltype) { + } else if (lhs->lflag > rhs->lflag) { return 1; } @@ -322,16 +335,62 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { } } -static SLastCol *tsdbCacheDeserialize(char const *value) { +// note: new object do not own colVal's resource, just copy the pointer +static SLastCol *tsdbCacheConvertLastColV1(SLastColV1 *pLastColV1) { + SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + if (pLastCol == NULL) return NULL; + pLastCol->rowKey.ts = pLastColV1->ts; + pLastCol->rowKey.numOfPKs = 0; + pLastCol->dirty = pLastColV1->dirty; + pLastCol->colVal = pLastColV1->colVal; + + return pLastCol; +} + +static SLastCol *tsdbCacheDeserializeV1(char const *value) { if (!value) { return NULL; } - SLastCol *pLastCol = (SLastCol *)value; - SColVal *pColVal = &pLastCol->colVal; + SLastColV1 *pLastColV1 = (SLastColV1 *)value; + SColVal *pColVal = &pLastColV1->colVal; if (IS_VAR_DATA_TYPE(pColVal->value.type)) { if (pColVal->value.nData > 0) { - pColVal->value.pData = (char *)value + sizeof(*pLastCol); + pColVal->value.pData = (char *)value + sizeof(*pLastColV1); + } else { + pColVal->value.pData = NULL; + } + } + + return tsdbCacheConvertLastColV1(pLastColV1); +} + +static SLastCol *tsdbCacheDeserializeV2(char const *value) { + if (!value) { + return NULL; + } + + SLastCol *pLastCol = taosMemoryMalloc(sizeof(SLastCol)); + *pLastCol = *(SLastCol *)(value); + + char* currentPos = (char *)value + sizeof(*pLastCol); + for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { + SValue* pValue = &pLastCol->rowKey.pks[i]; + if (IS_VAR_DATA_TYPE(pValue->type)) { + if (pValue->nData > 0) { + pValue->pData = currentPos; + currentPos += pValue->nData; + } else { + pValue->pData = NULL; + } + } + } + + SColVal *pColVal = &pLastCol->colVal; + if (IS_VAR_DATA_TYPE(pColVal->value.type)) { + if (pColVal->value.nData > 0) { + pColVal->value.pData = currentPos; + currentPos += pColVal->value.nData; } else { pColVal->value.pData = NULL; } @@ -340,25 +399,68 @@ static SLastCol *tsdbCacheDeserialize(char const *value) { return pLastCol; } +static SLastCol *tsdbCacheDeserialize(char const *value) { + if (!value) { + return NULL; + } + + bool hasVersion = ((*(int64_t *)value) & LAST_COL_VERSION_BASE) == LAST_COL_VERSION_BASE; + if (!hasVersion) { + return tsdbCacheDeserializeV1(value); + } + return tsdbCacheDeserializeV2(value + sizeof(int64_t)); +} + +static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) { + ASSERT(from->nData >= 0); + if (from->nData > 0) { + memcpy(to->pData, from->pData, from->nData); + } + to->type = from->type; + to->nData = from->nData; + return from->nData; +} + static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { SColVal *pColVal = &pLastCol->colVal; - size_t length = sizeof(*pLastCol); + size_t length = sizeof(int64_t) + sizeof(*pLastCol); + for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { + if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) { + length += pLastCol->rowKey.pks[i].nData; + } + } if (IS_VAR_DATA_TYPE(pColVal->value.type)) { length += pColVal->value.nData; } - *value = taosMemoryMalloc(length); - *(SLastCol *)(*value) = *pLastCol; - if (IS_VAR_DATA_TYPE(pColVal->value.type)) { - uint8_t *pVal = pColVal->value.pData; - SColVal *pDColVal = &((SLastCol *)(*value))->colVal; - pDColVal->value.pData = *value + sizeof(*pLastCol); - if (pColVal->value.nData > 0) { - memcpy(pDColVal->value.pData, pVal, pColVal->value.nData); - } else { - pDColVal->value.pData = NULL; + // set version + *value = taosMemoryMalloc(length); + char *currentPos = *value; + *(int64_t *)currentPos = LAST_COL_VERSION; + currentPos += sizeof(int64_t); + + // copy last col + SLastCol* pToLastCol = (SLastCol *)currentPos; + *pToLastCol = *pLastCol; + currentPos += sizeof(*pLastCol); + + // copy var data pks + for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { + SValue *pFromValue = &pLastCol->rowKey.pks[i]; + if (IS_VAR_DATA_TYPE(pFromValue->type)) { + SValue *pToValue = &pToLastCol->rowKey.pks[i]; + pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos; + currentPos += tsdbCacheCopyVarData(pFromValue, pToValue); } } + + // copy var data value + if (IS_VAR_DATA_TYPE(pColVal->value.type)) { + SValue *pFromValue = &pColVal->value; + SValue *pToValue = &pToLastCol->colVal.value; + pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos; + currentPos += tsdbCacheCopyVarData(pFromValue, pToValue); + } *size = length; } @@ -459,13 +561,16 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud taosMemoryFree(value); } -static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t ltype) { +static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) { int32_t code = 0; SLRUCache *pCache = pTsdb->lruCache; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; - SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1}; - SLastCol *pLastCol = &noneCol; + SRowKey noneRowKey = {0}; + noneRowKey.ts = TSKEY_MIN; + noneRowKey.numOfPKs = 0; + SLastCol noneCol = {.rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1}; + SLastCol *pLastCol = &noneCol; SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); *pTmpLastCol = *pLastCol; @@ -477,7 +582,7 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i charge += pLastCol->colVal.value.nData; } - SLastKey *pLastKey = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid}; LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); if (status != TAOS_LRU_STATUS_OK) { @@ -519,7 +624,7 @@ int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) { return code; } -static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t ltype) { +static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) { int32_t code = 0; // build keys & multi get from rocks @@ -527,9 +632,11 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t)); const size_t klen = ROCKS_KEY_LEN; + int8_t lflag = hasPrimaryKey ? LFLAG_PRIMARY_KEY : 0; + char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); - ((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid}; - ((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid}; keys_list[0] = keys; keys_list[1] = keys + sizeof(SLastKey); @@ -588,6 +695,8 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, if (erase) { taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen); } + + taosMemoryFree(pLastCol); } taosMemoryFree(keys_list[0]); @@ -606,13 +715,18 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap taosThreadMutexLock(&pTsdb->lruMutex); if (suid < 0) { - int nCols = pSchemaRow->nCols; + int8_t lflag = 0; + int nCols = pSchemaRow->nCols; + if (nCols >= 2) { + lflag = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0; + } + for (int i = 0; i < nCols; ++i) { int16_t cid = pSchemaRow->pSchema[i].colId; int8_t col_type = pSchemaRow->pSchema[i].type; - (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0); - (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST_ROW); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST); } } else { STSchema *pTSchema = NULL; @@ -622,13 +736,18 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap return -1; } - int nCols = pTSchema->numOfCols; + int8_t lflag = 0; + int nCols = pTSchema->numOfCols; + if (nCols >= 2) { + lflag = (pTSchema->columns[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0; + } + for (int i = 0; i < nCols; ++i) { int16_t cid = pTSchema->columns[i].colId; int8_t col_type = pTSchema->columns[i].type; - (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0); - (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST_ROW); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST); } taosMemoryFree(pTSchema); @@ -646,14 +765,17 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra (void)tsdbCacheCommitNoLock(pTsdb); - if (suid < 0) { - int nCols = pSchemaRow->nCols; + if (pSchemaRow != NULL) { + bool hasPrimayKey = false; + int nCols = pSchemaRow->nCols; + if (nCols >= 2) { + hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false; + } for (int i = 0; i < nCols; ++i) { int16_t cid = pSchemaRow->pSchema[i].colId; int8_t col_type = pSchemaRow->pSchema[i].type; - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); } } else { STSchema *pTSchema = NULL; @@ -663,13 +785,16 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra return -1; } - int nCols = pTSchema->numOfCols; + bool hasPrimayKey = false; + int nCols = pTSchema->numOfCols; + if (nCols >= 2) { + hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false; + } for (int i = 0; i < nCols; ++i) { int16_t cid = pTSchema->columns[i].colId; int8_t col_type = pTSchema->columns[i].type; - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); } taosMemoryFree(pTSchema); @@ -698,13 +823,17 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) { for (int i = 0; i < TARRAY_SIZE(uids); ++i) { int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i]; - int nCols = pTSchema->numOfCols; + bool hasPrimayKey = false; + int nCols = pTSchema->numOfCols; + if (nCols >= 2) { + hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false; + } + for (int i = 0; i < nCols; ++i) { int16_t cid = pTSchema->columns[i].colId; int8_t col_type = pTSchema->columns[i].type; - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); } } @@ -732,15 +861,14 @@ int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t return code; } -int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) { +int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) { int32_t code = 0; taosThreadMutexLock(&pTsdb->lruMutex); (void)tsdbCacheCommitNoLock(pTsdb); - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); rocksMayWrite(pTsdb, true, false, true); @@ -768,7 +896,7 @@ int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t return code; } -int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) { +int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) { int32_t code = 0; taosThreadMutexLock(&pTsdb->lruMutex); @@ -778,8 +906,7 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_ for (int i = 0; i < TARRAY_SIZE(uids); ++i) { int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i]; - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); } rocksMayWrite(pTsdb, true, false, true); @@ -794,6 +921,58 @@ typedef struct { SLastKey key; } SIdxKey; +static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal *pColVal) { + uint8_t *pVal = NULL; + int nData = 0; + + // update rowkey + pLastCol->rowKey.ts = pRowKey->ts; + pLastCol->rowKey.numOfPKs = pRowKey->numOfPKs; + for (int8_t i = 0; i < pRowKey->numOfPKs; i++) { + SValue *pPKValue = &pLastCol->rowKey.pks[i]; + SValue *pNewPKValue = &pRowKey->pks[i]; + + if (IS_VAR_DATA_TYPE(pPKValue->type)) { + pVal = pPKValue->pData; + nData = pPKValue->nData; + } + *pPKValue = *pNewPKValue; + if (IS_VAR_DATA_TYPE(pPKValue->type)) { + if (nData < pPKValue->nData) { + taosMemoryFree(pVal); + pPKValue->pData = taosMemoryCalloc(1, pNewPKValue->nData); + } else { + pPKValue->pData = pVal; + } + if (pNewPKValue->nData) { + memcpy(pPKValue->pData, pNewPKValue->pData, pNewPKValue->nData); + } + } + } + + // update colval + if (IS_VAR_DATA_TYPE(pColVal->value.type)) { + nData = pLastCol->colVal.value.nData; + pVal = pLastCol->colVal.value.pData; + } + pLastCol->colVal = *pColVal; + if (IS_VAR_DATA_TYPE(pColVal->value.type)) { + if (nData < pColVal->value.nData) { + taosMemoryFree(pVal); + pLastCol->colVal.value.pData = taosMemoryCalloc(1, pColVal->value.nData); + } else { + pLastCol->colVal.value.pData = pVal; + } + if (pColVal->value.nData) { + memcpy(pLastCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } + } + + if (!pLastCol->dirty) { + pLastCol->dirty = 1; + } +} + int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { int32_t code = 0; @@ -821,46 +1000,28 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow // 3, build keys & multi get from rocks int num_keys = TARRAY_SIZE(aColVal); - TSKEY keyTs = TSDBROW_TS(pRow); SArray *remainCols = NULL; SLRUCache *pCache = pTsdb->lruCache; + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(pRow, &tsdbRowKey); + SRowKey *pRowKey = &tsdbRowKey.key; + int8_t lflag = (pRowKey->numOfPKs != 0) ? LFLAG_PRIMARY_KEY : 0; + taosThreadMutexLock(&pTsdb->lruMutex); for (int i = 0; i < num_keys; ++i) { SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); int16_t cid = pColVal->cid; - SLastKey *key = &(SLastKey){.ltype = 0, .uid = uid, .cid = cid}; + SLastKey *key = &(SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid}; size_t klen = ROCKS_KEY_LEN; LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); if (h) { SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - if (pLastCol->ts <= keyTs) { - uint8_t *pVal = NULL; - int nData = pLastCol->colVal.value.nData; - if (IS_VAR_DATA_TYPE(pColVal->value.type)) { - pVal = pLastCol->colVal.value.pData; - } - pLastCol->ts = keyTs; - pLastCol->colVal = *pColVal; - if (IS_VAR_DATA_TYPE(pColVal->value.type)) { - if (nData < pColVal->value.nData) { - taosMemoryFree(pVal); - pLastCol->colVal.value.pData = taosMemoryCalloc(1, pColVal->value.nData); - } else { - pLastCol->colVal.value.pData = pVal; - } - if (pColVal->value.nData) { - memcpy(pLastCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); - } - } - - if (!pLastCol->dirty) { - pLastCol->dirty = 1; - } + if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) { + tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); } - taosLRUCacheRelease(pCache, h, false); } else { if (!remainCols) { @@ -870,36 +1031,14 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow } if (COL_VAL_IS_VALUE(pColVal)) { - key->ltype = 1; + key->lflag = lflag | LFLAG_LAST; LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); if (h) { SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - if (pLastCol->ts <= keyTs) { - uint8_t *pVal = NULL; - int nData = pLastCol->colVal.value.nData; - if (IS_VAR_DATA_TYPE(pColVal->value.type)) { - pVal = pLastCol->colVal.value.pData; - } - pLastCol->ts = keyTs; - pLastCol->colVal = *pColVal; - if (IS_VAR_DATA_TYPE(pColVal->value.type)) { - if (nData < pColVal->value.nData) { - taosMemoryFree(pVal); - pLastCol->colVal.value.pData = taosMemoryCalloc(1, pColVal->value.nData); - } else { - pLastCol->colVal.value.pData = pVal; - } - if (pColVal->value.nData) { - memcpy(pLastCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); - } - } - - if (!pLastCol->dirty) { - pLastCol->dirty = 1; - } + if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) { + tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); } - taosLRUCacheRelease(pCache, h, false); } else { if (!remainCols) { @@ -943,11 +1082,11 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); - if (idxKey->key.ltype == 0) { - if (NULL == pLastCol || pLastCol->ts <= keyTs) { + if (IS_LAST_ROW_KEY(idxKey->key)) { + if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) { char *value = NULL; size_t vlen = 0; - tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); + tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen); // SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; taosThreadMutexLock(&pTsdb->rCache.rMutex); @@ -976,10 +1115,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow } } else { if (COL_VAL_IS_VALUE(pColVal)) { - if (NULL == pLastCol || pLastCol->ts <= keyTs) { + if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) { char *value = NULL; size_t vlen = 0; - tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); + tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen); // SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid}; taosThreadMutexLock(&pTsdb->rCache.rMutex); @@ -1007,6 +1146,8 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosMemoryFree(value); } } + + taosMemoryFree(pLastCol); } rocksdb_free(values_list[i]); @@ -1409,6 +1550,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA taosArraySet(pLastArray, idxKey->idx, &lastCol); taosArrayRemove(remainCols, j); + taosMemoryFree(pLastCol); taosMemoryFree(values_list[i]); } else { ++j; @@ -1517,12 +1659,18 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); const size_t klen = ROCKS_KEY_LEN; + + int8_t lflag = 0; + if (num_keys >= 2) { + lflag = (pTSchema->columns[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0; + } + for (int i = 0; i < num_keys; ++i) { int16_t cid = pTSchema->columns[i].colId; char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); - ((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid}; - ((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid}; keys_list[i] = keys; keys_list[num_keys + i] = keys + sizeof(SLastKey); @@ -1554,15 +1702,16 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE for (int i = 0; i < num_keys; ++i) { SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); taosThreadMutexLock(&pTsdb->rCache.rMutex); - if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { + if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[i], klen); } pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); - if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { + if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen); } taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + taosMemoryFree(pLastCol); rocksdb_free(values_list[i]); rocksdb_free(values_list[i + num_keys]); @@ -1575,7 +1724,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE if (pLastCol->dirty) { pLastCol->dirty = 0; } - if (pLastCol->ts <= eKey && pLastCol->ts >= sKey) { + if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) { erase = true; } taosLRUCacheRelease(pTsdb->lruCache, h, erase); @@ -1591,7 +1740,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE if (pLastCol->dirty) { pLastCol->dirty = 0; } - if (pLastCol->ts <= eKey && pLastCol->ts >= sKey) { + if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) { erase = true; } taosLRUCacheRelease(pTsdb->lruCache, h, erase); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 8be8fa5bd7..be15a4fecf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -194,18 +194,8 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid pMemTable->nDel++; pMemTable->minVer = TMIN(pMemTable->minVer, version); pMemTable->maxVer = TMAX(pMemTable->maxVer, version); - /* - if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) { - tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey); - } - if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) { - tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey); - } - */ - // if (eKey >= pTbData->maxKey && sKey <= pTbData->maxKey) { tsdbCacheDel(pTsdb, suid, uid, sKey, eKey); - //} tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 " at version %" PRId64, @@ -838,4 +828,4 @@ TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) { pIter->row = pIter->pNode->row; return pIter->pRow; -} \ No newline at end of file +}