From d59b767af3f9ecb1363c05406510d29178cd06b0 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 1 Feb 2024 17:40:32 +0800 Subject: [PATCH 1/3] enh(tsdb/cache): flag empty table when creating --- source/dnode/vnode/src/inc/vnodeInt.h | 7 + source/dnode/vnode/src/meta/metaTable.c | 109 ++++++- source/dnode/vnode/src/tsdb/tsdbCache.c | 368 ++++++++++++++++++++++-- 3 files changed, 464 insertions(+), 20 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 23f79158c3..fb6a86843c 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -214,6 +214,13 @@ int32_t tsdbBegin(STsdb* pTsdb); // int32_t tsdbPrepareCommit(STsdb* pTsdb); // int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); int32_t tsdbCacheCommit(STsdb* pTsdb); +int32_t tsdbCacheNewTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow); +int32_t tsdbCacheDropTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow); +int32_t tsdbCacheDropSubTables(STsdb* pTsdb, SArray* uids, tb_uid_t suid); +int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type); +int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type); +int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type); +int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index ac2486eda1..a2c7a04ab3 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -303,6 +303,8 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb tdbTbcClose(pCtbIdxc); + (void)tsdbCacheDropSubTables(pMeta->pVnode->pTsdb, tbUidList, pReq->suid); + metaWLock(pMeta); for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) { @@ -334,6 +336,40 @@ _exit: return 0; } +static void metaGetSubtables(SMeta *pMeta, int64_t suid, SArray *uids) { + if (!uids) return; + + int c = 0; + void *pKey = NULL; + int nKey = 0; + TBC *pCtbIdxc = NULL; + + tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, NULL); + int rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c); + if (rc < 0) { + tdbTbcClose(pCtbIdxc); + metaWLock(pMeta); + return; + } + + for (;;) { + rc = tdbTbcNext(pCtbIdxc, &pKey, &nKey, NULL, NULL); + if (rc < 0) break; + + if (((SCtbIdxKey *)pKey)->suid < suid) { + continue; + } else if (((SCtbIdxKey *)pKey)->suid > suid) { + break; + } + + taosArrayPush(uids, &(((SCtbIdxKey *)pKey)->uid)); + } + + tdbFree(pKey); + + tdbTbcClose(pCtbIdxc); +} + int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { SMetaEntry oStbEntry = {0}; SMetaEntry nStbEntry = {0}; @@ -397,9 +433,39 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { nStbEntry.stbEntry.schemaRow = pReq->schemaRow; nStbEntry.stbEntry.schemaTag = pReq->schemaTag; - int32_t deltaCol = pReq->schemaRow.nCols - oStbEntry.stbEntry.schemaRow.nCols; + int nCols = pReq->schemaRow.nCols; + int onCols = oStbEntry.stbEntry.schemaRow.nCols; + int32_t deltaCol = nCols - onCols; bool updStat = deltaCol != 0 && !metaTbInFilterCache(pMeta, pReq->name, 1); + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + STsdb *pTsdb = pMeta->pVnode->pTsdb; + SArray *uids = taosArrayInit(8, sizeof(int64_t)); + if (deltaCol == 1) { + int16_t cid = pReq->schemaRow.pSchema[nCols - 1].colId; + int8_t col_type = pReq->schemaRow.pSchema[nCols - 1].type; + + metaGetSubtables(pMeta, pReq->suid, uids); + tsdbCacheNewSTableColumn(pTsdb, uids, cid, col_type); + } else if (deltaCol == -1) { + int16_t cid = -1; + int8_t col_type = -1; + for (int i = 0, j = 0; i < nCols && j < onCols; ++i, ++j) { + if (pReq->schemaRow.pSchema[i].colId != oStbEntry.stbEntry.schemaRow.pSchema[j].colId) { + cid = oStbEntry.stbEntry.schemaRow.pSchema[j].colId; + col_type = oStbEntry.stbEntry.schemaRow.pSchema[j].type; + break; + } + } + + if (cid != -1) { + metaGetSubtables(pMeta, pReq->suid, uids); + tsdbCacheDropSTableColumn(pTsdb, uids, cid, col_type); + } + } + if (uids) taosArrayDestroy(uids); + } + metaWLock(pMeta); // compare two entry if (oStbEntry.stbEntry.schemaRow.version != pReq->schemaRow.version) { @@ -822,6 +888,10 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs metaUidCacheClear(pMeta, me.ctbEntry.suid); metaTbGroupCacheClear(pMeta, me.ctbEntry.suid); metaULock(pMeta); + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + tsdbCacheNewTable(pMeta->pVnode->pTsdb, me.uid, me.ctbEntry.suid, NULL); + } } else { me.ntbEntry.btime = pReq->btime; me.ntbEntry.ttlDays = pReq->ttl; @@ -832,6 +902,10 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs ++pStats->numOfNTables; pStats->numOfNTimeSeries += me.ntbEntry.schemaRow.nCols - 1; + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + tsdbCacheNewTable(pMeta->pVnode->pTsdb, me.uid, -1, &me.ntbEntry.schemaRow); + } } if (metaHandleEntry(pMeta, &me) < 0) goto _err; @@ -896,6 +970,10 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi if ((type == TSDB_CHILD_TABLE || type == TSDB_NORMAL_TABLE) && tbUids) { taosArrayPush(tbUids, &uid); + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + tsdbCacheDropTable(pMeta->pVnode->pTsdb, uid, suid, NULL); + } } if ((type == TSDB_CHILD_TABLE) && tbUid) { @@ -930,6 +1008,11 @@ void metaDropTables(SMeta *pMeta, SArray *tbUids) { } tSimpleHashPut(suidHash, &suid, sizeof(tb_uid_t), &nCtbDropped, sizeof(int64_t)); } + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + tsdbCacheDropTable(pMeta->pVnode->pTsdb, uid, suid, NULL); + } + metaDebug("batch drop table:%" PRId64, uid); } metaULock(pMeta); @@ -1172,11 +1255,22 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1, 0); metaUidCacheClear(pMeta, e.ctbEntry.suid); metaTbGroupCacheClear(pMeta, e.ctbEntry.suid); + /* + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + tsdbCacheDropTable(pMeta->pVnode->pTsdb, e.uid, e.ctbEntry.suid, NULL); + } + */ } else if (e.type == TSDB_NORMAL_TABLE) { // drop schema.db (todo) --pMeta->pVnode->config.vndStats.numOfNTables; pMeta->pVnode->config.vndStats.numOfNTimeSeries -= e.ntbEntry.schemaRow.nCols - 1; + + /* + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + tsdbCacheDropTable(pMeta->pVnode->pTsdb, e.uid, -1, &e.ntbEntry.schemaRow); + } + */ } else if (e.type == TSDB_SUPER_TABLE) { tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), pMeta->txn); // drop schema.db (todo) @@ -1364,6 +1458,12 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl ++pMeta->pVnode->config.vndStats.numOfNTimeSeries; metaTimeSeriesNotifyCheck(pMeta); + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + int16_t cid = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId; + int8_t col_type = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type; + (void)tsdbCacheNewNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type); + } break; case TSDB_ALTER_TABLE_DROP_COLUMN: if (pColumn == NULL) { @@ -1386,6 +1486,13 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl pSchema->nCols--; --pMeta->pVnode->config.vndStats.numOfNTimeSeries; + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + int16_t cid = pColumn->colId; + int8_t col_type = pColumn->type; + + (void)tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type); + } break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: if (pColumn == NULL) { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index cc0bf2b774..98ac4333d0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -431,25 +431,6 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) { return code; } -static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) { - SLastCol *pLastCol = NULL; - - char *err = NULL; - size_t vlen = 0; - SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; - size_t klen = ROCKS_KEY_LEN; - char *value = NULL; - value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); - rocksdb_free(err); - } - - pLastCol = tsdbCacheDeserialize(value); - - return pLastCol; -} - static void reallocVarData(SColVal *pColVal) { if (IS_VAR_DATA_TYPE(pColVal->type)) { uint8_t *pVal = pColVal->value.pData; @@ -476,6 +457,355 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud taosMemoryFree(value); } +static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t ltype) { + int32_t code = 0; + + SLRUCache *pCache = pTsdb->lruCache; + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1}; + SLastCol *pLastCol = &noneCol; + + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + reallocVarData(&pLastCol->colVal); + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + SLastKey *pLastKey = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, + TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + /* + // store result back to rocks cache + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(pLastCol, &value, &vlen); + + SLastKey *key = pLastKey; + size_t klen = ROCKS_KEY_LEN; + rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); + taosMemoryFree(value); + */ + return code; +} + +int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) { + int32_t code = 0; + char *err = NULL; + + SLRUCache *pCache = pTsdb->lruCache; + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + + taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState); + + rocksMayWrite(pTsdb, true, false, false); + rocksMayWrite(pTsdb, true, true, false); + rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); + + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + code = -1; + } + + return code; +} + +static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t ltype) { + int32_t code = 0; + + // build keys & multi get from rocks + char **keys_list = taosMemoryCalloc(2, sizeof(char *)); + size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t)); + const size_t klen = ROCKS_KEY_LEN; + + char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); + ((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid}; + + keys_list[0] = keys; + keys_list[1] = keys + sizeof(SLastKey); + keys_list_sizes[0] = klen; + keys_list_sizes[1] = klen; + + char **values_list = taosMemoryCalloc(2, sizeof(char *)); + size_t *values_list_sizes = taosMemoryCalloc(2, sizeof(size_t)); + char **errs = taosMemoryCalloc(2, sizeof(char *)); + + // rocksMayWrite(pTsdb, true, false, false); + rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, 2, (const char *const *)keys_list, keys_list_sizes, + values_list, values_list_sizes, errs); + + for (int i = 0; i < 2; ++i) { + if (errs[i]) { + rocksdb_free(errs[i]); + } + } + taosMemoryFree(errs); + + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + { + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[0]); + if (NULL != pLastCol) { + rocksdb_writebatch_delete(wb, keys_list[0], klen); + } + pLastCol = tsdbCacheDeserialize(values_list[1]); + if (NULL != pLastCol) { + rocksdb_writebatch_delete(wb, keys_list[1], klen); + } + + rocksdb_free(values_list[0]); + rocksdb_free(values_list[1]); + + bool erase = false; + LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[0], klen); + if (h) { + SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h); + erase = true; + + taosLRUCacheRelease(pTsdb->lruCache, h, erase); + } + if (erase) { + taosLRUCacheErase(pTsdb->lruCache, keys_list[0], klen); + } + + erase = false; + h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[1], klen); + if (h) { + SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h); + erase = true; + + taosLRUCacheRelease(pTsdb->lruCache, h, erase); + } + if (erase) { + taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen); + } + } + + taosMemoryFree(keys_list[0]); + + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(values_list); + taosMemoryFree(values_list_sizes); + + return code; +} + +int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) { + int32_t code = 0; + + taosThreadMutexLock(&pTsdb->lruMutex); + + if (suid < 0) { + int nCols = pSchemaRow->nCols; + for (int i = 0; i < nCols; ++i) { + int16_t cid = pSchemaRow->pSchema[i].colId; + int8_t col_type = pSchemaRow->pSchema[i].type; + + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1); + } + } else { + STSchema *pTSchema = NULL; + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + + int nCols = pTSchema->numOfCols; + for (int i = 0; i < nCols; ++i) { + int16_t cid = pTSchema->columns[i].colId; + int8_t col_type = pTSchema->columns[i].type; + + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1); + } + + taosMemoryFree(pTSchema); + } + + taosThreadMutexUnlock(&pTsdb->lruMutex); + + return code; +} + +int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) { + int32_t code = 0; + + taosThreadMutexLock(&pTsdb->lruMutex); + + (void)tsdbCacheCommitNoLock(pTsdb); + + if (suid < 0) { + int nCols = pSchemaRow->nCols; + for (int i = 0; i < nCols; ++i) { + int16_t cid = pSchemaRow->pSchema[i].colId; + int8_t col_type = pSchemaRow->pSchema[i].type; + + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + } + } else { + STSchema *pTSchema = NULL; + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + + int nCols = pTSchema->numOfCols; + for (int i = 0; i < nCols; ++i) { + int16_t cid = pTSchema->columns[i].colId; + int8_t col_type = pTSchema->columns[i].type; + + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + } + + taosMemoryFree(pTSchema); + } + + rocksMayWrite(pTsdb, true, false, false); + + taosThreadMutexUnlock(&pTsdb->lruMutex); + + return code; +} + +int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) { + int32_t code = 0; + + taosThreadMutexLock(&pTsdb->lruMutex); + + (void)tsdbCacheCommitNoLock(pTsdb); + + STSchema *pTSchema = NULL; + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + for (int i = 0; i < TARRAY_SIZE(uids); ++i) { + int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i]; + + int nCols = pTSchema->numOfCols; + for (int i = 0; i < nCols; ++i) { + int16_t cid = pTSchema->columns[i].colId; + int8_t col_type = pTSchema->columns[i].type; + + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + } + } + + taosMemoryFree(pTSchema); + + rocksMayWrite(pTsdb, true, false, false); + + taosThreadMutexUnlock(&pTsdb->lruMutex); + + return code; +} + +int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) { + int32_t code = 0; + + taosThreadMutexLock(&pTsdb->lruMutex); + + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1); + + // rocksMayWrite(pTsdb, true, false, false); + taosThreadMutexUnlock(&pTsdb->lruMutex); + //(void)tsdbCacheCommit(pTsdb); + + return code; +} + +int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) { + int32_t code = 0; + + taosThreadMutexLock(&pTsdb->lruMutex); + + (void)tsdbCacheCommitNoLock(pTsdb); + + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + + rocksMayWrite(pTsdb, true, false, true); + + taosThreadMutexUnlock(&pTsdb->lruMutex); + + return code; +} + +int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) { + int32_t code = 0; + + taosThreadMutexLock(&pTsdb->lruMutex); + + for (int i = 0; i < TARRAY_SIZE(uids); ++i) { + tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i]; + + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1); + } + + // rocksMayWrite(pTsdb, true, false, false); + taosThreadMutexUnlock(&pTsdb->lruMutex); + //(void)tsdbCacheCommit(pTsdb); + + return code; +} + +int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) { + int32_t code = 0; + + taosThreadMutexLock(&pTsdb->lruMutex); + + (void)tsdbCacheCommitNoLock(pTsdb); + + for (int i = 0; i < TARRAY_SIZE(uids); ++i) { + int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i]; + + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + } + + rocksMayWrite(pTsdb, true, false, true); + + taosThreadMutexUnlock(&pTsdb->lruMutex); + + return code; +} + +static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) { + SLastCol *pLastCol = NULL; + + char *err = NULL; + size_t vlen = 0; + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + size_t klen = ROCKS_KEY_LEN; + char *value = NULL; + value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + } + + pLastCol = tsdbCacheDeserialize(value); + + return pLastCol; +} + typedef struct { int idx; SLastKey key; From 3562ef433475b51b2ab819b70d269a3b1a408675 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 2 Feb 2024 12:29:29 +0800 Subject: [PATCH 2/3] fix(query/cache): separate null inserting --- tests/system-test/2-query/last_row.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/system-test/2-query/last_row.py b/tests/system-test/2-query/last_row.py index 0744b3bae5..13d1e47f76 100644 --- a/tests/system-test/2-query/last_row.py +++ b/tests/system-test/2-query/last_row.py @@ -117,6 +117,10 @@ class TDTestCase: ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" ) ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" ) ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" ) + ''' + ) + tdSql.execute( + f'''insert into {dbname}.t1 values ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ''' ) @@ -179,6 +183,10 @@ class TDTestCase: ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" ) ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" ) ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" ) + ''' + ) + tdSql.execute( + f'''insert into {dbname}.t1 values ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ''' ) From 85738649ce6c8fb2989878d1d13cd36243a395d2 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 20 Feb 2024 11:14:28 +0800 Subject: [PATCH 3/3] fix(test/query): keep cache query consistent with none model --- tests/system-test/2-query/last_cache_scan.py | 50 ++++++++++++-------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/tests/system-test/2-query/last_cache_scan.py b/tests/system-test/2-query/last_cache_scan.py index b267cedc9d..049fe60f16 100644 --- a/tests/system-test/2-query/last_cache_scan.py +++ b/tests/system-test/2-query/last_cache_scan.py @@ -24,6 +24,11 @@ class TDTestCase: self.ctbNum = 10 self.rowsPerTbl = 10000 self.duraion = '1h' + self.cachemodel = 'both' + self.cacheEnable = True + #self.cacheEnable = False + if not self.cacheEnable: + self.cachemodel = 'none' def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -34,7 +39,7 @@ class TDTestCase: if dropFlag == 1: tsql.execute("drop database if exists %s"%(dbName)) - tsql.execute("create database if not exists %s vgroups %d replica %d duration %s CACHEMODEL 'both'"%(dbName, vgroups, replica, duration)) + tsql.execute("create database if not exists %s vgroups %d replica %d duration %s CACHEMODEL '%s'"%(dbName, vgroups, replica, duration, self.cachemodel)) tdLog.debug("complete to create database %s"%(dbName)) return @@ -130,6 +135,9 @@ class TDTestCase: return def check_explain_res_has_row(self, plan_str_expect: str, rows, sql): + if not self.cacheEnable: + return + plan_found = False for row in rows: if str(row).find(plan_str_expect) >= 0: @@ -343,10 +351,10 @@ class TDTestCase: p.check_returncode() tdSql.query_success_failed("select ts, last(c1), c1, ts, c1 from meters", queryTimes=10, expectErrInfo="Invalid column name: c1") tdSql.query('select last(c12), c12, ts from meters', queryTimes=1) - tdSql.checkRows(1) - tdSql.checkCols(3) - tdSql.checkData(0, 0, None) - tdSql.checkData(0, 1, None) + tdSql.checkRows(0) + #tdSql.checkCols(3) + #tdSql.checkData(0, 0, None) + #tdSql.checkData(0, 1, None) def test_cache_scan_with_drop_column(self): tdSql.query('select last(*) from meters') @@ -378,41 +386,41 @@ class TDTestCase: p.check_returncode() tdSql.query_success_failed("select ts, last(c2), c12, ts, c12 from meters", queryTimes=10, expectErrInfo="Invalid column name: c2") tdSql.query('select last(c1), c1, ts from meters', queryTimes=1) - tdSql.checkRows(1) - tdSql.checkCols(3) - tdSql.checkData(0, 0, None) - tdSql.checkData(0, 1, None) + tdSql.checkRows(0) + #tdSql.checkCols(3) + #tdSql.checkData(0, 0, None) + #tdSql.checkData(0, 1, None) def test_cache_scan_last_row_with_partition_by(self): tdSql.query('select last(c1) from meters partition by t1') print(str(tdSql.queryResult)) - tdSql.checkCols(1) - tdSql.checkRows(5) + #tdSql.checkCols(1) + tdSql.checkRows(0) p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c2 int"]) p.check_returncode() tdSql.query_success_failed('select last(c1) from meters partition by t1', queryTimes=10, expectErrInfo="Invalid column name: c1") tdSql.query('select last(c2), c2, ts from meters', queryTimes=1) print(str(tdSql.queryResult)) - tdSql.checkRows(1) - tdSql.checkCols(3) - tdSql.checkData(0, 0, None) - tdSql.checkData(0, 1, None) + tdSql.checkRows(0) + #tdSql.checkCols(3) + #tdSql.checkData(0, 0, None) + #tdSql.checkData(0, 1, None) def test_cache_scan_last_row_with_partition_by_tbname(self): tdSql.query('select last(c2) from meters partition by tbname') print(str(tdSql.queryResult)) - tdSql.checkCols(1) - tdSql.checkRows(10) + #tdSql.checkCols(1) + tdSql.checkRows(0) p = subprocess.run(["taos", '-s', "alter table test.meters drop column c2; alter table test.meters add column c1 int"]) p.check_returncode() tdSql.query_success_failed('select last_row(c2) from meters partition by tbname', queryTimes=10, expectErrInfo="Invalid column name: c2") tdSql.query('select last(c1), c1, ts from meters', queryTimes=1) print(str(tdSql.queryResult)) - tdSql.checkRows(1) - tdSql.checkCols(3) - tdSql.checkData(0, 0, None) - tdSql.checkData(0, 1, None) + tdSql.checkRows(0) + #tdSql.checkCols(3) + #tdSql.checkData(0, 0, None) + #tdSql.checkData(0, 1, None)