From 16c17bbf726fe092bcf50cfd592452c77d327f9f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 7 Apr 2024 14:46:22 +0800 Subject: [PATCH 1/6] fix(tsdb): fix error in tsdbread. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 4 +-- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 32 +++++++++---------- source/libs/executor/src/timewindowoperator.c | 3 +- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index a12575f1c6..9fcc10e396 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -114,10 +114,10 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) { if (p1->pks[0].nData == p2->pks[0].nData) { return 0; } else { - return p1->pks[0].nData > p2->pks[0].nData?1:-1; + return p1->pks[0].nData > p2->pks[0].nData ? 1 : -1; } } else { - return ret > 0? 1:-1; + return ret > 0 ? 1 : -1; } } else { return comparFn(&p1->pks[0].val, &p2->pks[0].val); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 48a6b9a7a8..2a7b0140df 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -992,39 +992,39 @@ static bool overlapWithTimeWindow(STimeWindow* p1, STimeWindow* pQueryWindow, ST } static int32_t sortUidComparFn(const void* p1, const void* p2) { - const STimeWindow* px1 = p1; - const STimeWindow* px2 = p2; - if (px1->skey == px2->skey) { - return 0; - } else { - return px1->skey < px2->skey ? -1 : 1; - } + const SSttKeyRange* px1 = p1; + const SSttKeyRange* px2 = p2; + + int32_t ret = tRowKeyCompare(&px1, px2); + return ret; } -bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, +bool isCleanSttBlock(SArray* pKeyRangeList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, int32_t order) { // check if it overlap with del skyline - taosArraySort(pTimewindowList, sortUidComparFn); + taosArraySort(pKeyRangeList, sortUidComparFn); - int32_t num = taosArrayGetSize(pTimewindowList); + int32_t num = taosArrayGetSize(pKeyRangeList); if (num == 0) { return false; } - STimeWindow* p = taosArrayGet(pTimewindowList, 0); - if (overlapWithTimeWindow(p, pQueryWindow, pScanInfo, order)) { + SSttKeyRange* pRange = taosArrayGet(pKeyRangeList, 0); + STimeWindow w = {.skey = pRange->skey.ts, .ekey = pRange->ekey.ts}; + if (overlapWithTimeWindow(&w, pQueryWindow, pScanInfo, order)) { return false; } for (int32_t i = 0; i < num - 1; ++i) { - STimeWindow* p1 = taosArrayGet(pTimewindowList, i); - STimeWindow* p2 = taosArrayGet(pTimewindowList, i + 1); + SSttKeyRange* p1 = taosArrayGet(pKeyRangeList, i); + SSttKeyRange* p2 = taosArrayGet(pKeyRangeList, i + 1); - if (p1->ekey >= p2->skey) { + if (p1->ekey.ts >= p2->skey.ts) { return false; } - bool overlap = overlapWithTimeWindow(p2, pQueryWindow, pScanInfo, order); + STimeWindow w2 = {.skey = p2->skey.ts, .ekey = p2->ekey.ts}; + bool overlap = overlapWithTimeWindow(&w2, pQueryWindow, pScanInfo, order); if (overlap) { return false; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index f24b581ca2..8fb8aaa69d 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -520,9 +520,8 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB // duplicated ts row does not involve in the interpolation of end value for current time window int32_t x = endRowIndex; - while(x >= 0) { + while(x > 0) { if (tsCols[x] == tsCols[x-1]) { - x -= 1; } else { endRowIndex = x; From 6174c3bfd38876494eecd299243d0d27247706b7 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 7 Apr 2024 14:54:58 +0800 Subject: [PATCH 2/6] fix:disable primary key in schemaless --- include/util/taoserror.h | 1 + source/client/src/clientSml.c | 21 +++++++ source/util/src/terror.c | 1 + utils/test/c/sml_test.c | 104 ++++++++++++++++++++++++++++++++++ 4 files changed, 127 insertions(+) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 3dc6e5333d..2389079fd2 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -806,6 +806,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SML_INVALID_DB_CONF TAOS_DEF_ERROR_CODE(0, 0x3003) #define TSDB_CODE_SML_NOT_SAME_TYPE TAOS_DEF_ERROR_CODE(0, 0x3004) #define TSDB_CODE_SML_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x3005) +#define TSDB_CODE_SML_NOT_SUPPORT_PK TAOS_DEF_ERROR_CODE(0, 0x3006) //tsma #define TSDB_CODE_TSMA_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x3100) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 6bcdb4e973..79c079f871 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -310,6 +310,16 @@ int32_t smlJoinMeasureTag(SSmlLineInfo *elements){ return TSDB_CODE_SUCCESS; } +static bool smlIsPKTable(STableMeta *pTableMeta){ + for(int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++){ + if(pTableMeta->schema[i].flags & COL_IS_KEY){ + return true; + } + } + + return false; +} + int32_t smlProcessSuperTable(SSmlHandle *info, SSmlLineInfo *elements) { bool isSameMeasure = IS_SAME_SUPER_TABLE; if(isSameMeasure) { @@ -328,6 +338,11 @@ int32_t smlProcessSuperTable(SSmlHandle *info, SSmlLineInfo *elements) { info->currSTableMeta = sMeta->tableMeta; info->maxTagKVs = sMeta->tags; info->maxColKVs = sMeta->cols; + + if(smlIsPKTable(sMeta->tableMeta)){ + terrno = TSDB_CODE_SML_NOT_SUPPORT_PK; + return -1; + } return 0; } @@ -1063,6 +1078,12 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { goto end; } } else if (code == TSDB_CODE_SUCCESS) { + + if(smlIsPKTable(pTableMeta)){ + code = TSDB_CODE_SML_NOT_SUPPORT_PK; + goto end; + } + hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); for (uint16_t i = pTableMeta->tableInfo.numOfColumns; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 64edb7faad..9436ee33f8 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -668,6 +668,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_DATA, "Invalid data format TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_DB_CONF, "Invalid schemaless db config") TAOS_DEFINE_ERROR(TSDB_CODE_SML_NOT_SAME_TYPE, "Not the same type like before") TAOS_DEFINE_ERROR(TSDB_CODE_SML_INTERNAL_ERROR, "Internal error") +TAOS_DEFINE_ERROR(TSDB_CODE_SML_NOT_SUPPORT_PK, "Can not insert data into table with primary key") //tsma TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INIT_FAILED, "Tsma init failed") diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index 01619decc5..fa76f829eb 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -1841,12 +1841,116 @@ int sml_td18789_Test() { return code; } +int sml_td29373_Test() { + TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); + + TAOS_RES *pRes = taos_query(taos, "drop database if exists td29373"); + taos_free_result(pRes); + + pRes = taos_query(taos, "create database if not exists td29373"); + taos_free_result(pRes); + + pRes = taos_query(taos, "use td29373"); + taos_free_result(pRes); + + pRes = taos_query(taos, "create table pktable (ts timestamp, f1 int primary key, f2 binary(10)) tags (t1 int)"); + taos_free_result(pRes); + + // case 1 + const char *sql[] = { + "pktable,t1=1 f1=283i32,f2=b\"hello\" 1632299372000", + "pktable,t1=2 f1=232i32,f2=b\"he3llo\" 1632299373000", + }; + + pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_MILLI_SECONDS); + + int code = taos_errno(pRes); + printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); + ASSERT(code == TSDB_CODE_SML_NOT_SUPPORT_PK); + taos_free_result(pRes); + + // case 2 + const char *sql1[] = { + "pktable,t1=2 f2=b\"he3llo\",f1=232i32 1632299373000", + "pktable,t1=1 f1=283i32,f2=b\"hello\" 1632299372000" + }; + + pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_MILLI_SECONDS); + + code = taos_errno(pRes); + printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); + ASSERT(code == TSDB_CODE_SML_NOT_SUPPORT_PK); + taos_free_result(pRes); + + // case 3 + pRes = taos_query(taos, "create table pktablejson (ts timestamp, f1 int primary key, f2 binary(10)) tags (`host` varchar(8), dc varchar(8))"); + taos_free_result(pRes); + const char *sql2[] = { "" + "[\n" + " {\n" + " \"metric\": \"pktablejson\",\n" + " \"timestamp\": 1346846400001,\n" + " \"value\": 18,\n" + " \"tags\": {\n" + " \"host\": \"web01\",\n" + " \"dc\": \"lga\"\n" + " }\n" + " },\n" + " {\n" + " \"metric\": \"pktablejson\",\n" + " \"timestamp\": 1346846400002,\n" + " \"value\": 9,\n" + " \"tags\": {\n" + " \"host\": \"web02\",\n" + " \"dc\": \"lga\"\n" + " }\n" + " }\n" + "]" + }; + char *sql3[1] = {0}; + for (int i = 0; i < 1; i++) { + sql3[i] = taosMemoryCalloc(1, 1024); + strncpy(sql3[i], sql2[i], 1023); + } + + pRes = taos_schemaless_insert(taos, (char **)sql3, sizeof(sql3) / sizeof(sql3[0]), TSDB_SML_JSON_PROTOCOL, + TSDB_SML_TIMESTAMP_MILLI_SECONDS); + + code = taos_errno(pRes); + printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); + ASSERT(code == TSDB_CODE_SML_NOT_SUPPORT_PK); + taos_free_result(pRes); + + + // case 4 + const char *sql4[] = { + "pktablejson 1479496100 1.3E0 host=web01 dc=eth0", + "pktablejson 1479496100 1.2E0 dc=web01 host=eth0", + }; + + pRes = taos_schemaless_insert(taos, (char **)sql4, sizeof(sql4) / sizeof(sql4[0]), TSDB_SML_TELNET_PROTOCOL, + TSDB_SML_TIMESTAMP_MILLI_SECONDS); + + code = taos_errno(pRes); + printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); + ASSERT(code == TSDB_CODE_SML_NOT_SUPPORT_PK); + taos_free_result(pRes); + + taos_close(taos); + + return code; +} + int main(int argc, char *argv[]) { if (argc == 2) { taos_options(TSDB_OPTION_CONFIGDIR, argv[1]); } int ret = 0; + ret = sml_td29373_Test(); + ASSERT(!ret); ret = sml_td24559_Test(); ASSERT(!ret); ret = sml_td18789_Test(); From 64280a10ccb7b64bf038f52dec2c605c697401b1 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 7 Apr 2024 15:04:34 +0800 Subject: [PATCH 3/6] fix:disable primary key in schemaless --- utils/test/c/sml_test.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index fa76f829eb..bf37e7a182 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -1950,7 +1950,7 @@ int main(int argc, char *argv[]) { int ret = 0; ret = sml_td29373_Test(); - ASSERT(!ret); + ASSERT(ret); ret = sml_td24559_Test(); ASSERT(!ret); ret = sml_td18789_Test(); From 96fc85017b68de17e11b360f44edd7fa384dbe39 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 7 Apr 2024 16:51:40 +0800 Subject: [PATCH 4/6] fix(query): fix error in interp --- source/libs/executor/src/timesliceoperator.c | 21 +++----------------- 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 9b28a203b8..080fd6b914 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -206,28 +206,14 @@ static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumn cur.pks[0].type = pPkCol->info.type; } + // let's discard the duplicated ts if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevKey.ts)) { -// if (pPkCol == NULL) { - return true; - /* } else { - tRowGetKeyFromColData(currentTs, pPkCol, curIndex, &cur); - if (tRowKeyCompare(&cur, &pSliceInfo->prevKey) == 0) { - return true; - } - }*/ + return true; } pSliceInfo->prevTsSet = true; tRowKeyAssign(&pSliceInfo->prevKey, &cur); - // todo handle next - if (currentTs == pSliceInfo->win.ekey && curIndex < rows - 1) { - int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1); - if (currentTs == nextTs) { - return true; - } - } - return false; } @@ -735,7 +721,6 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS // check for duplicate timestamps if (checkDuplicateTimestamps(pSliceInfo, pTsCol, pPkCol, i, pBlock->info.rows)) { continue; -// T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP); } if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) { @@ -754,6 +739,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS if (checkWindowBoundReached(pSliceInfo)) { break; } + if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { saveBlockStatus(pSliceInfo, pBlock, i); return; @@ -828,7 +814,6 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS // if reached here, meaning block processing finished naturally, // or interpolation reach window upper bound pSliceInfo->pRemainRes = NULL; - } static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, int32_t index) { From b649a73a19d52004866cbe06435cf2956779cac4 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Wed, 3 Apr 2024 09:34:56 +0800 Subject: [PATCH 5/6] feat: pk insert/drop last cache --- source/dnode/vnode/src/inc/tsdb.h | 8 +- source/dnode/vnode/src/inc/vnodeInt.h | 4 +- source/dnode/vnode/src/meta/metaTable.c | 16 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 367 +++++++++++++++------ source/dnode/vnode/src/tsdb/tsdbMemTable.c | 12 +- 5 files changed, 279 insertions(+), 128 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a574583561..a082d33a02 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -895,11 +895,17 @@ typedef enum { } EExecMode; typedef struct { - TSKEY ts; + SRowKey rowKey; int8_t dirty; SColVal colVal; } SLastCol; +typedef struct { + TSKEY ts; + int8_t dirty; + SColVal colVal; +} SLastColV1; + int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d564c5a36e..30b7e685a1 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -234,9 +234,9 @@ int32_t tsdbCacheNewTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapp int32_t tsdbCacheDropTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow); int32_t tsdbCacheDropSubTables(STsdb* pTsdb, SArray* uids, tb_uid_t suid); int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type); -int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type); +int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, bool hasPrimayKey); int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type); -int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type); +int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 3c032f193a..17adf80f06 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -449,18 +449,20 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { tsdbCacheNewSTableColumn(pTsdb, uids, cid, col_type); } else if (deltaCol == -1) { int16_t cid = -1; - int8_t col_type = -1; + bool hasPrimaryKey = false; + if (onCols >= 2) { + hasPrimaryKey = (oStbEntry.stbEntry.schemaRow.pSchema[1].flags & COL_IS_KEY) ? true : false; + } for (int i = 0, j = 0; i < nCols && j < onCols; ++i, ++j) { if (pReq->schemaRow.pSchema[i].colId != oStbEntry.stbEntry.schemaRow.pSchema[j].colId) { cid = oStbEntry.stbEntry.schemaRow.pSchema[j].colId; - col_type = oStbEntry.stbEntry.schemaRow.pSchema[j].type; break; } } if (cid != -1) { metaGetSubtables(pMeta, pReq->suid, uids); - tsdbCacheDropSTableColumn(pTsdb, uids, cid, col_type); + tsdbCacheDropSTableColumn(pTsdb, uids, cid, hasPrimaryKey); } } if (uids) taosArrayDestroy(uids); @@ -1478,6 +1480,11 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl terrno = TSDB_CODE_VND_COL_SUBSCRIBED; goto _err; } + bool hasPrimayKey = false; + if (pSchema->nCols >= 2) { + hasPrimayKey = pSchema->pSchema[1].flags & COL_IS_KEY ? true : false; + } + pSchema->version++; tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema); if (tlen) { @@ -1489,9 +1496,8 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { int16_t cid = pColumn->colId; - int8_t col_type = pColumn->type; - (void)tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type); + (void)tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, hasPrimayKey); } break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index c293b63f5d..977ef177d2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -127,12 +127,25 @@ static void tsdbClosePgCache(STsdb *pTsdb) { #define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t)) +enum { + LFLAG_LAST_ROW = 0, + LFLAG_LAST = 1, + LFLAG_PRIMARY_KEY = (1 << 4), +}; + typedef struct { tb_uid_t uid; int16_t cid; - int8_t ltype; + int8_t lflag; } SLastKey; +#define LAST_COL_VERSION_BASE (((int64_t)(0x1)) << 63) +#define LAST_COL_VERSION (LAST_COL_VERSION_BASE + 2) + +#define HAS_PRIMARY_KEY(k) (((k).lflag & LFLAG_PRIMARY_KEY) == LFLAG_PRIMARY_KEY) +#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW) +#define IS_LAST_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST) + static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { SVnode *pVnode = pTsdb->pVnode; vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN); @@ -167,9 +180,9 @@ static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t return 1; } - if (lhs->ltype < rhs->ltype) { + if (lhs->lflag < rhs->lflag) { return -1; - } else if (lhs->ltype > rhs->ltype) { + } else if (lhs->lflag > rhs->lflag) { return 1; } @@ -322,16 +335,62 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { } } -static SLastCol *tsdbCacheDeserialize(char const *value) { +// note: new object do not own colVal's resource, just copy the pointer +static SLastCol *tsdbCacheConvertLastColV1(SLastColV1 *pLastColV1) { + SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + if (pLastCol == NULL) return NULL; + pLastCol->rowKey.ts = pLastColV1->ts; + pLastCol->rowKey.numOfPKs = 0; + pLastCol->dirty = pLastColV1->dirty; + pLastCol->colVal = pLastColV1->colVal; + + return pLastCol; +} + +static SLastCol *tsdbCacheDeserializeV1(char const *value) { if (!value) { return NULL; } - SLastCol *pLastCol = (SLastCol *)value; - SColVal *pColVal = &pLastCol->colVal; + SLastColV1 *pLastColV1 = (SLastColV1 *)value; + SColVal *pColVal = &pLastColV1->colVal; if (IS_VAR_DATA_TYPE(pColVal->value.type)) { if (pColVal->value.nData > 0) { - pColVal->value.pData = (char *)value + sizeof(*pLastCol); + pColVal->value.pData = (char *)value + sizeof(*pLastColV1); + } else { + pColVal->value.pData = NULL; + } + } + + return tsdbCacheConvertLastColV1(pLastColV1); +} + +static SLastCol *tsdbCacheDeserializeV2(char const *value) { + if (!value) { + return NULL; + } + + SLastCol *pLastCol = taosMemoryMalloc(sizeof(SLastCol)); + *pLastCol = *(SLastCol *)(value); + + char* currentPos = (char *)value + sizeof(*pLastCol); + for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { + SValue* pValue = &pLastCol->rowKey.pks[i]; + if (IS_VAR_DATA_TYPE(pValue->type)) { + if (pValue->nData > 0) { + pValue->pData = currentPos; + currentPos += pValue->nData; + } else { + pValue->pData = NULL; + } + } + } + + SColVal *pColVal = &pLastCol->colVal; + if (IS_VAR_DATA_TYPE(pColVal->value.type)) { + if (pColVal->value.nData > 0) { + pColVal->value.pData = currentPos; + currentPos += pColVal->value.nData; } else { pColVal->value.pData = NULL; } @@ -340,25 +399,68 @@ static SLastCol *tsdbCacheDeserialize(char const *value) { return pLastCol; } +static SLastCol *tsdbCacheDeserialize(char const *value) { + if (!value) { + return NULL; + } + + bool hasVersion = ((*(int64_t *)value) & LAST_COL_VERSION_BASE) == LAST_COL_VERSION_BASE; + if (!hasVersion) { + return tsdbCacheDeserializeV1(value); + } + return tsdbCacheDeserializeV2(value + sizeof(int64_t)); +} + +static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) { + ASSERT(from->nData >= 0); + if (from->nData > 0) { + memcpy(to->pData, from->pData, from->nData); + } + to->type = from->type; + to->nData = from->nData; + return from->nData; +} + static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { SColVal *pColVal = &pLastCol->colVal; - size_t length = sizeof(*pLastCol); + size_t length = sizeof(int64_t) + sizeof(*pLastCol); + for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { + if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) { + length += pLastCol->rowKey.pks[i].nData; + } + } if (IS_VAR_DATA_TYPE(pColVal->value.type)) { length += pColVal->value.nData; } - *value = taosMemoryMalloc(length); - *(SLastCol *)(*value) = *pLastCol; - if (IS_VAR_DATA_TYPE(pColVal->value.type)) { - uint8_t *pVal = pColVal->value.pData; - SColVal *pDColVal = &((SLastCol *)(*value))->colVal; - pDColVal->value.pData = *value + sizeof(*pLastCol); - if (pColVal->value.nData > 0) { - memcpy(pDColVal->value.pData, pVal, pColVal->value.nData); - } else { - pDColVal->value.pData = NULL; + // set version + *value = taosMemoryMalloc(length); + char *currentPos = *value; + *(int64_t *)currentPos = LAST_COL_VERSION; + currentPos += sizeof(int64_t); + + // copy last col + SLastCol* pToLastCol = (SLastCol *)currentPos; + *pToLastCol = *pLastCol; + currentPos += sizeof(*pLastCol); + + // copy var data pks + for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { + SValue *pFromValue = &pLastCol->rowKey.pks[i]; + if (IS_VAR_DATA_TYPE(pFromValue->type)) { + SValue *pToValue = &pToLastCol->rowKey.pks[i]; + pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos; + currentPos += tsdbCacheCopyVarData(pFromValue, pToValue); } } + + // copy var data value + if (IS_VAR_DATA_TYPE(pColVal->value.type)) { + SValue *pFromValue = &pColVal->value; + SValue *pToValue = &pToLastCol->colVal.value; + pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos; + currentPos += tsdbCacheCopyVarData(pFromValue, pToValue); + } *size = length; } @@ -459,13 +561,16 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud taosMemoryFree(value); } -static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t ltype) { +static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) { int32_t code = 0; SLRUCache *pCache = pTsdb->lruCache; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; - SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1}; - SLastCol *pLastCol = &noneCol; + SRowKey noneRowKey = {0}; + noneRowKey.ts = TSKEY_MIN; + noneRowKey.numOfPKs = 0; + SLastCol noneCol = {.rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1}; + SLastCol *pLastCol = &noneCol; SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); *pTmpLastCol = *pLastCol; @@ -477,7 +582,7 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i charge += pLastCol->colVal.value.nData; } - SLastKey *pLastKey = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid}; LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); if (status != TAOS_LRU_STATUS_OK) { @@ -519,7 +624,7 @@ int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) { return code; } -static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t ltype) { +static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) { int32_t code = 0; // build keys & multi get from rocks @@ -527,9 +632,11 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t)); const size_t klen = ROCKS_KEY_LEN; + int8_t lflag = hasPrimaryKey ? LFLAG_PRIMARY_KEY : 0; + char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); - ((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid}; - ((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid}; keys_list[0] = keys; keys_list[1] = keys + sizeof(SLastKey); @@ -588,6 +695,8 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, if (erase) { taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen); } + + taosMemoryFree(pLastCol); } taosMemoryFree(keys_list[0]); @@ -606,13 +715,18 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap taosThreadMutexLock(&pTsdb->lruMutex); if (suid < 0) { - int nCols = pSchemaRow->nCols; + int8_t lflag = 0; + int nCols = pSchemaRow->nCols; + if (nCols >= 2) { + lflag = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0; + } + for (int i = 0; i < nCols; ++i) { int16_t cid = pSchemaRow->pSchema[i].colId; int8_t col_type = pSchemaRow->pSchema[i].type; - (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0); - (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST_ROW); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST); } } else { STSchema *pTSchema = NULL; @@ -622,13 +736,18 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap return -1; } - int nCols = pTSchema->numOfCols; + int8_t lflag = 0; + int nCols = pTSchema->numOfCols; + if (nCols >= 2) { + lflag = (pTSchema->columns[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0; + } + for (int i = 0; i < nCols; ++i) { int16_t cid = pTSchema->columns[i].colId; int8_t col_type = pTSchema->columns[i].type; - (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0); - (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST_ROW); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST); } taosMemoryFree(pTSchema); @@ -646,14 +765,17 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra (void)tsdbCacheCommitNoLock(pTsdb); - if (suid < 0) { - int nCols = pSchemaRow->nCols; + if (pSchemaRow != NULL) { + bool hasPrimayKey = false; + int nCols = pSchemaRow->nCols; + if (nCols >= 2) { + hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false; + } for (int i = 0; i < nCols; ++i) { int16_t cid = pSchemaRow->pSchema[i].colId; int8_t col_type = pSchemaRow->pSchema[i].type; - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); } } else { STSchema *pTSchema = NULL; @@ -663,13 +785,16 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra return -1; } - int nCols = pTSchema->numOfCols; + bool hasPrimayKey = false; + int nCols = pTSchema->numOfCols; + if (nCols >= 2) { + hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false; + } for (int i = 0; i < nCols; ++i) { int16_t cid = pTSchema->columns[i].colId; int8_t col_type = pTSchema->columns[i].type; - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); } taosMemoryFree(pTSchema); @@ -698,13 +823,17 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) { for (int i = 0; i < TARRAY_SIZE(uids); ++i) { int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i]; - int nCols = pTSchema->numOfCols; + bool hasPrimayKey = false; + int nCols = pTSchema->numOfCols; + if (nCols >= 2) { + hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false; + } + for (int i = 0; i < nCols; ++i) { int16_t cid = pTSchema->columns[i].colId; int8_t col_type = pTSchema->columns[i].type; - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); } } @@ -732,15 +861,14 @@ int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t return code; } -int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) { +int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) { int32_t code = 0; taosThreadMutexLock(&pTsdb->lruMutex); (void)tsdbCacheCommitNoLock(pTsdb); - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); rocksMayWrite(pTsdb, true, false, true); @@ -768,7 +896,7 @@ int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t return code; } -int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) { +int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) { int32_t code = 0; taosThreadMutexLock(&pTsdb->lruMutex); @@ -778,8 +906,7 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_ for (int i = 0; i < TARRAY_SIZE(uids); ++i) { int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i]; - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); - (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); } rocksMayWrite(pTsdb, true, false, true); @@ -794,6 +921,58 @@ typedef struct { SLastKey key; } SIdxKey; +static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal *pColVal) { + uint8_t *pVal = NULL; + int nData = 0; + + // update rowkey + pLastCol->rowKey.ts = pRowKey->ts; + pLastCol->rowKey.numOfPKs = pRowKey->numOfPKs; + for (int8_t i = 0; i < pRowKey->numOfPKs; i++) { + SValue *pPKValue = &pLastCol->rowKey.pks[i]; + SValue *pNewPKValue = &pRowKey->pks[i]; + + if (IS_VAR_DATA_TYPE(pPKValue->type)) { + pVal = pPKValue->pData; + nData = pPKValue->nData; + } + *pPKValue = *pNewPKValue; + if (IS_VAR_DATA_TYPE(pPKValue->type)) { + if (nData < pPKValue->nData) { + taosMemoryFree(pVal); + pPKValue->pData = taosMemoryCalloc(1, pNewPKValue->nData); + } else { + pPKValue->pData = pVal; + } + if (pNewPKValue->nData) { + memcpy(pPKValue->pData, pNewPKValue->pData, pNewPKValue->nData); + } + } + } + + // update colval + if (IS_VAR_DATA_TYPE(pColVal->value.type)) { + nData = pLastCol->colVal.value.nData; + pVal = pLastCol->colVal.value.pData; + } + pLastCol->colVal = *pColVal; + if (IS_VAR_DATA_TYPE(pColVal->value.type)) { + if (nData < pColVal->value.nData) { + taosMemoryFree(pVal); + pLastCol->colVal.value.pData = taosMemoryCalloc(1, pColVal->value.nData); + } else { + pLastCol->colVal.value.pData = pVal; + } + if (pColVal->value.nData) { + memcpy(pLastCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } + } + + if (!pLastCol->dirty) { + pLastCol->dirty = 1; + } +} + int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { int32_t code = 0; @@ -821,46 +1000,28 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow // 3, build keys & multi get from rocks int num_keys = TARRAY_SIZE(aColVal); - TSKEY keyTs = TSDBROW_TS(pRow); SArray *remainCols = NULL; SLRUCache *pCache = pTsdb->lruCache; + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(pRow, &tsdbRowKey); + SRowKey *pRowKey = &tsdbRowKey.key; + int8_t lflag = (pRowKey->numOfPKs != 0) ? LFLAG_PRIMARY_KEY : 0; + taosThreadMutexLock(&pTsdb->lruMutex); for (int i = 0; i < num_keys; ++i) { SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); int16_t cid = pColVal->cid; - SLastKey *key = &(SLastKey){.ltype = 0, .uid = uid, .cid = cid}; + SLastKey *key = &(SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid}; size_t klen = ROCKS_KEY_LEN; LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); if (h) { SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - if (pLastCol->ts <= keyTs) { - uint8_t *pVal = NULL; - int nData = pLastCol->colVal.value.nData; - if (IS_VAR_DATA_TYPE(pColVal->value.type)) { - pVal = pLastCol->colVal.value.pData; - } - pLastCol->ts = keyTs; - pLastCol->colVal = *pColVal; - if (IS_VAR_DATA_TYPE(pColVal->value.type)) { - if (nData < pColVal->value.nData) { - taosMemoryFree(pVal); - pLastCol->colVal.value.pData = taosMemoryCalloc(1, pColVal->value.nData); - } else { - pLastCol->colVal.value.pData = pVal; - } - if (pColVal->value.nData) { - memcpy(pLastCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); - } - } - - if (!pLastCol->dirty) { - pLastCol->dirty = 1; - } + if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) { + tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); } - taosLRUCacheRelease(pCache, h, false); } else { if (!remainCols) { @@ -870,36 +1031,14 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow } if (COL_VAL_IS_VALUE(pColVal)) { - key->ltype = 1; + key->lflag = lflag | LFLAG_LAST; LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); if (h) { SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - if (pLastCol->ts <= keyTs) { - uint8_t *pVal = NULL; - int nData = pLastCol->colVal.value.nData; - if (IS_VAR_DATA_TYPE(pColVal->value.type)) { - pVal = pLastCol->colVal.value.pData; - } - pLastCol->ts = keyTs; - pLastCol->colVal = *pColVal; - if (IS_VAR_DATA_TYPE(pColVal->value.type)) { - if (nData < pColVal->value.nData) { - taosMemoryFree(pVal); - pLastCol->colVal.value.pData = taosMemoryCalloc(1, pColVal->value.nData); - } else { - pLastCol->colVal.value.pData = pVal; - } - if (pColVal->value.nData) { - memcpy(pLastCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); - } - } - - if (!pLastCol->dirty) { - pLastCol->dirty = 1; - } + if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) { + tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); } - taosLRUCacheRelease(pCache, h, false); } else { if (!remainCols) { @@ -943,11 +1082,11 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); - if (idxKey->key.ltype == 0) { - if (NULL == pLastCol || pLastCol->ts <= keyTs) { + if (IS_LAST_ROW_KEY(idxKey->key)) { + if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) { char *value = NULL; size_t vlen = 0; - tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); + tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen); // SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; taosThreadMutexLock(&pTsdb->rCache.rMutex); @@ -976,10 +1115,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow } } else { if (COL_VAL_IS_VALUE(pColVal)) { - if (NULL == pLastCol || pLastCol->ts <= keyTs) { + if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) { char *value = NULL; size_t vlen = 0; - tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); + tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen); // SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid}; taosThreadMutexLock(&pTsdb->rCache.rMutex); @@ -1007,6 +1146,8 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosMemoryFree(value); } } + + taosMemoryFree(pLastCol); } rocksdb_free(values_list[i]); @@ -1409,6 +1550,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA taosArraySet(pLastArray, idxKey->idx, &lastCol); taosArrayRemove(remainCols, j); + taosMemoryFree(pLastCol); taosMemoryFree(values_list[i]); } else { ++j; @@ -1517,12 +1659,18 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); const size_t klen = ROCKS_KEY_LEN; + + int8_t lflag = 0; + if (num_keys >= 2) { + lflag = (pTSchema->columns[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0; + } + for (int i = 0; i < num_keys; ++i) { int16_t cid = pTSchema->columns[i].colId; char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); - ((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid}; - ((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid}; keys_list[i] = keys; keys_list[num_keys + i] = keys + sizeof(SLastKey); @@ -1554,15 +1702,16 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE for (int i = 0; i < num_keys; ++i) { SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); taosThreadMutexLock(&pTsdb->rCache.rMutex); - if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { + if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[i], klen); } pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); - if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { + if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen); } taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + taosMemoryFree(pLastCol); rocksdb_free(values_list[i]); rocksdb_free(values_list[i + num_keys]); @@ -1575,7 +1724,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE if (pLastCol->dirty) { pLastCol->dirty = 0; } - if (pLastCol->ts <= eKey && pLastCol->ts >= sKey) { + if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) { erase = true; } taosLRUCacheRelease(pTsdb->lruCache, h, erase); @@ -1591,7 +1740,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE if (pLastCol->dirty) { pLastCol->dirty = 0; } - if (pLastCol->ts <= eKey && pLastCol->ts >= sKey) { + if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) { erase = true; } taosLRUCacheRelease(pTsdb->lruCache, h, erase); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 8be8fa5bd7..be15a4fecf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -194,18 +194,8 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid pMemTable->nDel++; pMemTable->minVer = TMIN(pMemTable->minVer, version); pMemTable->maxVer = TMAX(pMemTable->maxVer, version); - /* - if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) { - tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey); - } - if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) { - tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey); - } - */ - // if (eKey >= pTbData->maxKey && sKey <= pTbData->maxKey) { tsdbCacheDel(pTsdb, suid, uid, sKey, eKey); - //} tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 " at version %" PRId64, @@ -838,4 +828,4 @@ TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) { pIter->row = pIter->pNode->row; return pIter->pRow; -} \ No newline at end of file +} From bc1c7545a8a9f3e2ea92e48ce8ed78816679dc3c Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Wed, 3 Apr 2024 09:38:04 +0800 Subject: [PATCH 6/6] mock commit for pk last read, need to revert --- source/dnode/vnode/src/tsdb/tsdbCache.c | 24 ++++++++++----------- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 14 ++++++------ 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 977ef177d2..ffb24fbc0e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1362,7 +1362,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr SIdxKey *idxKey = taosArrayGet(remainCols, 0); if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) { - SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID}; + SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID}; taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key}); } @@ -1385,7 +1385,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr for (int i = 0; i < num_keys; ++i) { SIdxKey *idxKey = taosArrayGet(remainCols, i); slotIds[i] = pr->pSlotIds[idxKey->idx]; - if (idxKey->key.ltype == CACHESCAN_RETRIEVE_LAST >> 3) { + if (idxKey->key.lflag == CACHESCAN_RETRIEVE_LAST >> 3) { if (NULL == lastTmpIndexArray) { lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t)); } @@ -1431,7 +1431,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } // still null, then make up a none col value - SLastCol noneCol = {.ts = TSKEY_MIN, + SLastCol noneCol = {.rowKey.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)}; if (!pLastCol) { pLastCol = &noneCol; @@ -1578,7 +1578,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache for (int i = 0; i < num_keys; ++i) { int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i]; - SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = cid}; // for select last_row, last case int32_t funcType = FUNCTION_TYPE_CACHE_LAST; if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) { @@ -1586,7 +1586,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache } if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) { int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST); - key->ltype = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3; + key->lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3; } LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); @@ -1599,7 +1599,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache taosLRUCacheRelease(pCache, h, false); } else { - SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; + SLastCol noneCol = {.rowKey.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; taosArrayPush(pLastArray, &noneCol); @@ -3232,7 +3232,7 @@ static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, for (int32_t i = 0; i < nCols; ++i) { int16_t slotId = slotIds[i]; - SLastCol col = {.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)}; + SLastCol col = {.rowKey.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)}; taosArrayPush(pColArray, &col); } *ppColArray = pColArray; @@ -3337,12 +3337,12 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC STColumn *pTColumn = &pTSchema->columns[0]; *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs})); - taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal}); + taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}); continue; } tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); - *pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal}; + *pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}; if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) { if (pColVal->value.nData > 0) { pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); @@ -3392,7 +3392,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) { - SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal}; + SLastCol lastCol = {.rowKey.ts = rowTs, .colVal = *pColVal}; if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) { SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); taosMemoryFree(pLastCol->colVal.value.pData); @@ -3516,12 +3516,12 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, STColumn *pTColumn = &pTSchema->columns[0]; *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs})); - taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal}); + taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}); continue; } tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); - *pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal}; + *pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}; if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) { if (pColVal->value.nData > 0) { pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 195ca59e9a..dd5da28b6b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -92,7 +92,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p p = (SFirstLastRes*)varDataVal(pRes[i]); - p->ts = pColVal->ts; + p->ts = pColVal->rowKey.ts; ts = p->ts; p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); // allNullRow = p->isNull & allNullRow; @@ -399,12 +399,12 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 for (int32_t i = 0; i < pr->numOfCols; ++i) { int32_t slotId = slotIds[i]; if (slotId == -1) { - SLastCol p = {.ts = INT64_MIN, .colVal.value.type = TSDB_DATA_TYPE_BOOL, .colVal.flag = CV_FLAG_NULL}; + SLastCol p = {.rowKey.ts = INT64_MIN, .colVal.value.type = TSDB_DATA_TYPE_BOOL, .colVal.flag = CV_FLAG_NULL}; taosArrayPush(pLastCols, &p); continue; } struct STColumn* pCol = &pr->pSchema->columns[slotId]; - SLastCol p = {.ts = INT64_MIN, .colVal.value.type = pCol->type, .colVal.flag = CV_FLAG_NULL}; + SLastCol p = {.rowKey.ts = INT64_MIN, .colVal.value.type = pCol->type, .colVal.flag = CV_FLAG_NULL}; if (IS_VAR_DATA_TYPE(pCol->type)) { p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char)); @@ -431,7 +431,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 SLastCol* p = taosArrayGet(pLastCols, k); SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k); - if (pColVal->ts > p->ts) { + if (pColVal->rowKey.ts > p->rowKey.ts) { if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) { if (!COL_VAL_IS_VALUE(&p->colVal)) { hasNotNullRow = false; @@ -443,7 +443,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } hasRes = true; - p->ts = pColVal->ts; + p->rowKey.ts = pColVal->rowKey.ts; if (k == 0) { if (TARRAY_SIZE(pTableUidList) == 0) { taosArrayPush(pTableUidList, &uid); @@ -452,8 +452,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } - if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) { - singleTableLastTs = pColVal->ts; + if (pColVal->rowKey.ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) { + singleTableLastTs = pColVal->rowKey.ts; } if (!IS_VAR_DATA_TYPE(pColVal->colVal.value.type)) {