From d59b767af3f9ecb1363c05406510d29178cd06b0 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 1 Feb 2024 17:40:32 +0800 Subject: [PATCH 01/12] enh(tsdb/cache): flag empty table when creating --- source/dnode/vnode/src/inc/vnodeInt.h | 7 + source/dnode/vnode/src/meta/metaTable.c | 109 ++++++- source/dnode/vnode/src/tsdb/tsdbCache.c | 368 ++++++++++++++++++++++-- 3 files changed, 464 insertions(+), 20 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 23f79158c3..fb6a86843c 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -214,6 +214,13 @@ int32_t tsdbBegin(STsdb* pTsdb); // int32_t tsdbPrepareCommit(STsdb* pTsdb); // int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); int32_t tsdbCacheCommit(STsdb* pTsdb); +int32_t tsdbCacheNewTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow); +int32_t tsdbCacheDropTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow); +int32_t tsdbCacheDropSubTables(STsdb* pTsdb, SArray* uids, tb_uid_t suid); +int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type); +int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type); +int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type); +int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index ac2486eda1..a2c7a04ab3 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -303,6 +303,8 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb tdbTbcClose(pCtbIdxc); + (void)tsdbCacheDropSubTables(pMeta->pVnode->pTsdb, tbUidList, pReq->suid); + metaWLock(pMeta); for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) { @@ -334,6 +336,40 @@ _exit: return 0; } +static void metaGetSubtables(SMeta *pMeta, int64_t suid, SArray *uids) { + if (!uids) return; + + int c = 0; + void *pKey = NULL; + int nKey = 0; + TBC *pCtbIdxc = NULL; + + tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, NULL); + int rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c); + if (rc < 0) { + tdbTbcClose(pCtbIdxc); + metaWLock(pMeta); + return; + } + + for (;;) { + rc = tdbTbcNext(pCtbIdxc, &pKey, &nKey, NULL, NULL); + if (rc < 0) break; + + if (((SCtbIdxKey *)pKey)->suid < suid) { + continue; + } else if (((SCtbIdxKey *)pKey)->suid > suid) { + break; + } + + taosArrayPush(uids, &(((SCtbIdxKey *)pKey)->uid)); + } + + tdbFree(pKey); + + tdbTbcClose(pCtbIdxc); +} + int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { SMetaEntry oStbEntry = {0}; SMetaEntry nStbEntry = {0}; @@ -397,9 +433,39 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { nStbEntry.stbEntry.schemaRow = pReq->schemaRow; nStbEntry.stbEntry.schemaTag = pReq->schemaTag; - int32_t deltaCol = pReq->schemaRow.nCols - oStbEntry.stbEntry.schemaRow.nCols; + int nCols = pReq->schemaRow.nCols; + int onCols = oStbEntry.stbEntry.schemaRow.nCols; + int32_t deltaCol = nCols - onCols; bool updStat = deltaCol != 0 && !metaTbInFilterCache(pMeta, pReq->name, 1); + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + STsdb *pTsdb = pMeta->pVnode->pTsdb; + SArray *uids = taosArrayInit(8, sizeof(int64_t)); + if (deltaCol == 1) { + int16_t cid = pReq->schemaRow.pSchema[nCols - 1].colId; + int8_t col_type = pReq->schemaRow.pSchema[nCols - 1].type; + + metaGetSubtables(pMeta, pReq->suid, uids); + tsdbCacheNewSTableColumn(pTsdb, uids, cid, col_type); + } else if (deltaCol == -1) { + int16_t cid = -1; + int8_t col_type = -1; + for (int i = 0, j = 0; i < nCols && j < onCols; ++i, ++j) { + if (pReq->schemaRow.pSchema[i].colId != oStbEntry.stbEntry.schemaRow.pSchema[j].colId) { + cid = oStbEntry.stbEntry.schemaRow.pSchema[j].colId; + col_type = oStbEntry.stbEntry.schemaRow.pSchema[j].type; + break; + } + } + + if (cid != -1) { + metaGetSubtables(pMeta, pReq->suid, uids); + tsdbCacheDropSTableColumn(pTsdb, uids, cid, col_type); + } + } + if (uids) taosArrayDestroy(uids); + } + metaWLock(pMeta); // compare two entry if (oStbEntry.stbEntry.schemaRow.version != pReq->schemaRow.version) { @@ -822,6 +888,10 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs metaUidCacheClear(pMeta, me.ctbEntry.suid); metaTbGroupCacheClear(pMeta, me.ctbEntry.suid); metaULock(pMeta); + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + tsdbCacheNewTable(pMeta->pVnode->pTsdb, me.uid, me.ctbEntry.suid, NULL); + } } else { me.ntbEntry.btime = pReq->btime; me.ntbEntry.ttlDays = pReq->ttl; @@ -832,6 +902,10 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs ++pStats->numOfNTables; pStats->numOfNTimeSeries += me.ntbEntry.schemaRow.nCols - 1; + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + tsdbCacheNewTable(pMeta->pVnode->pTsdb, me.uid, -1, &me.ntbEntry.schemaRow); + } } if (metaHandleEntry(pMeta, &me) < 0) goto _err; @@ -896,6 +970,10 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi if ((type == TSDB_CHILD_TABLE || type == TSDB_NORMAL_TABLE) && tbUids) { taosArrayPush(tbUids, &uid); + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + tsdbCacheDropTable(pMeta->pVnode->pTsdb, uid, suid, NULL); + } } if ((type == TSDB_CHILD_TABLE) && tbUid) { @@ -930,6 +1008,11 @@ void metaDropTables(SMeta *pMeta, SArray *tbUids) { } tSimpleHashPut(suidHash, &suid, sizeof(tb_uid_t), &nCtbDropped, sizeof(int64_t)); } + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + tsdbCacheDropTable(pMeta->pVnode->pTsdb, uid, suid, NULL); + } + metaDebug("batch drop table:%" PRId64, uid); } metaULock(pMeta); @@ -1172,11 +1255,22 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1, 0); metaUidCacheClear(pMeta, e.ctbEntry.suid); metaTbGroupCacheClear(pMeta, e.ctbEntry.suid); + /* + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + tsdbCacheDropTable(pMeta->pVnode->pTsdb, e.uid, e.ctbEntry.suid, NULL); + } + */ } else if (e.type == TSDB_NORMAL_TABLE) { // drop schema.db (todo) --pMeta->pVnode->config.vndStats.numOfNTables; pMeta->pVnode->config.vndStats.numOfNTimeSeries -= e.ntbEntry.schemaRow.nCols - 1; + + /* + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + tsdbCacheDropTable(pMeta->pVnode->pTsdb, e.uid, -1, &e.ntbEntry.schemaRow); + } + */ } else if (e.type == TSDB_SUPER_TABLE) { tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), pMeta->txn); // drop schema.db (todo) @@ -1364,6 +1458,12 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl ++pMeta->pVnode->config.vndStats.numOfNTimeSeries; metaTimeSeriesNotifyCheck(pMeta); + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + int16_t cid = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId; + int8_t col_type = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type; + (void)tsdbCacheNewNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type); + } break; case TSDB_ALTER_TABLE_DROP_COLUMN: if (pColumn == NULL) { @@ -1386,6 +1486,13 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl pSchema->nCols--; --pMeta->pVnode->config.vndStats.numOfNTimeSeries; + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + int16_t cid = pColumn->colId; + int8_t col_type = pColumn->type; + + (void)tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type); + } break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: if (pColumn == NULL) { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index cc0bf2b774..98ac4333d0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -431,25 +431,6 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) { return code; } -static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) { - SLastCol *pLastCol = NULL; - - char *err = NULL; - size_t vlen = 0; - SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; - size_t klen = ROCKS_KEY_LEN; - char *value = NULL; - value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); - rocksdb_free(err); - } - - pLastCol = tsdbCacheDeserialize(value); - - return pLastCol; -} - static void reallocVarData(SColVal *pColVal) { if (IS_VAR_DATA_TYPE(pColVal->type)) { uint8_t *pVal = pColVal->value.pData; @@ -476,6 +457,355 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud taosMemoryFree(value); } +static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t ltype) { + int32_t code = 0; + + SLRUCache *pCache = pTsdb->lruCache; + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1}; + SLastCol *pLastCol = &noneCol; + + SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); + *pTmpLastCol = *pLastCol; + pLastCol = pTmpLastCol; + + reallocVarData(&pLastCol->colVal); + size_t charge = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + charge += pLastCol->colVal.value.nData; + } + + SLastKey *pLastKey = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, + TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + /* + // store result back to rocks cache + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(pLastCol, &value, &vlen); + + SLastKey *key = pLastKey; + size_t klen = ROCKS_KEY_LEN; + rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); + taosMemoryFree(value); + */ + return code; +} + +int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) { + int32_t code = 0; + char *err = NULL; + + SLRUCache *pCache = pTsdb->lruCache; + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + + taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState); + + rocksMayWrite(pTsdb, true, false, false); + rocksMayWrite(pTsdb, true, true, false); + rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); + + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + code = -1; + } + + return code; +} + +static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t ltype) { + int32_t code = 0; + + // build keys & multi get from rocks + char **keys_list = taosMemoryCalloc(2, sizeof(char *)); + size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t)); + const size_t klen = ROCKS_KEY_LEN; + + char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); + ((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid}; + ((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid}; + + keys_list[0] = keys; + keys_list[1] = keys + sizeof(SLastKey); + keys_list_sizes[0] = klen; + keys_list_sizes[1] = klen; + + char **values_list = taosMemoryCalloc(2, sizeof(char *)); + size_t *values_list_sizes = taosMemoryCalloc(2, sizeof(size_t)); + char **errs = taosMemoryCalloc(2, sizeof(char *)); + + // rocksMayWrite(pTsdb, true, false, false); + rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, 2, (const char *const *)keys_list, keys_list_sizes, + values_list, values_list_sizes, errs); + + for (int i = 0; i < 2; ++i) { + if (errs[i]) { + rocksdb_free(errs[i]); + } + } + taosMemoryFree(errs); + + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + { + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[0]); + if (NULL != pLastCol) { + rocksdb_writebatch_delete(wb, keys_list[0], klen); + } + pLastCol = tsdbCacheDeserialize(values_list[1]); + if (NULL != pLastCol) { + rocksdb_writebatch_delete(wb, keys_list[1], klen); + } + + rocksdb_free(values_list[0]); + rocksdb_free(values_list[1]); + + bool erase = false; + LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[0], klen); + if (h) { + SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h); + erase = true; + + taosLRUCacheRelease(pTsdb->lruCache, h, erase); + } + if (erase) { + taosLRUCacheErase(pTsdb->lruCache, keys_list[0], klen); + } + + erase = false; + h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[1], klen); + if (h) { + SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h); + erase = true; + + taosLRUCacheRelease(pTsdb->lruCache, h, erase); + } + if (erase) { + taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen); + } + } + + taosMemoryFree(keys_list[0]); + + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(values_list); + taosMemoryFree(values_list_sizes); + + return code; +} + +int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) { + int32_t code = 0; + + taosThreadMutexLock(&pTsdb->lruMutex); + + if (suid < 0) { + int nCols = pSchemaRow->nCols; + for (int i = 0; i < nCols; ++i) { + int16_t cid = pSchemaRow->pSchema[i].colId; + int8_t col_type = pSchemaRow->pSchema[i].type; + + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1); + } + } else { + STSchema *pTSchema = NULL; + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + + int nCols = pTSchema->numOfCols; + for (int i = 0; i < nCols; ++i) { + int16_t cid = pTSchema->columns[i].colId; + int8_t col_type = pTSchema->columns[i].type; + + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1); + } + + taosMemoryFree(pTSchema); + } + + taosThreadMutexUnlock(&pTsdb->lruMutex); + + return code; +} + +int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) { + int32_t code = 0; + + taosThreadMutexLock(&pTsdb->lruMutex); + + (void)tsdbCacheCommitNoLock(pTsdb); + + if (suid < 0) { + int nCols = pSchemaRow->nCols; + for (int i = 0; i < nCols; ++i) { + int16_t cid = pSchemaRow->pSchema[i].colId; + int8_t col_type = pSchemaRow->pSchema[i].type; + + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + } + } else { + STSchema *pTSchema = NULL; + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + + int nCols = pTSchema->numOfCols; + for (int i = 0; i < nCols; ++i) { + int16_t cid = pTSchema->columns[i].colId; + int8_t col_type = pTSchema->columns[i].type; + + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + } + + taosMemoryFree(pTSchema); + } + + rocksMayWrite(pTsdb, true, false, false); + + taosThreadMutexUnlock(&pTsdb->lruMutex); + + return code; +} + +int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) { + int32_t code = 0; + + taosThreadMutexLock(&pTsdb->lruMutex); + + (void)tsdbCacheCommitNoLock(pTsdb); + + STSchema *pTSchema = NULL; + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + for (int i = 0; i < TARRAY_SIZE(uids); ++i) { + int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i]; + + int nCols = pTSchema->numOfCols; + for (int i = 0; i < nCols; ++i) { + int16_t cid = pTSchema->columns[i].colId; + int8_t col_type = pTSchema->columns[i].type; + + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + } + } + + taosMemoryFree(pTSchema); + + rocksMayWrite(pTsdb, true, false, false); + + taosThreadMutexUnlock(&pTsdb->lruMutex); + + return code; +} + +int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) { + int32_t code = 0; + + taosThreadMutexLock(&pTsdb->lruMutex); + + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1); + + // rocksMayWrite(pTsdb, true, false, false); + taosThreadMutexUnlock(&pTsdb->lruMutex); + //(void)tsdbCacheCommit(pTsdb); + + return code; +} + +int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) { + int32_t code = 0; + + taosThreadMutexLock(&pTsdb->lruMutex); + + (void)tsdbCacheCommitNoLock(pTsdb); + + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + + rocksMayWrite(pTsdb, true, false, true); + + taosThreadMutexUnlock(&pTsdb->lruMutex); + + return code; +} + +int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) { + int32_t code = 0; + + taosThreadMutexLock(&pTsdb->lruMutex); + + for (int i = 0; i < TARRAY_SIZE(uids); ++i) { + tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i]; + + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1); + } + + // rocksMayWrite(pTsdb, true, false, false); + taosThreadMutexUnlock(&pTsdb->lruMutex); + //(void)tsdbCacheCommit(pTsdb); + + return code; +} + +int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) { + int32_t code = 0; + + taosThreadMutexLock(&pTsdb->lruMutex); + + (void)tsdbCacheCommitNoLock(pTsdb); + + for (int i = 0; i < TARRAY_SIZE(uids); ++i) { + int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i]; + + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); + (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1); + } + + rocksMayWrite(pTsdb, true, false, true); + + taosThreadMutexUnlock(&pTsdb->lruMutex); + + return code; +} + +static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) { + SLastCol *pLastCol = NULL; + + char *err = NULL; + size_t vlen = 0; + SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + size_t klen = ROCKS_KEY_LEN; + char *value = NULL; + value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + } + + pLastCol = tsdbCacheDeserialize(value); + + return pLastCol; +} + typedef struct { int idx; SLastKey key; From 3562ef433475b51b2ab819b70d269a3b1a408675 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 2 Feb 2024 12:29:29 +0800 Subject: [PATCH 02/12] fix(query/cache): separate null inserting --- tests/system-test/2-query/last_row.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/system-test/2-query/last_row.py b/tests/system-test/2-query/last_row.py index 0744b3bae5..13d1e47f76 100644 --- a/tests/system-test/2-query/last_row.py +++ b/tests/system-test/2-query/last_row.py @@ -117,6 +117,10 @@ class TDTestCase: ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" ) ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" ) ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" ) + ''' + ) + tdSql.execute( + f'''insert into {dbname}.t1 values ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ''' ) @@ -179,6 +183,10 @@ class TDTestCase: ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" ) ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" ) ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" ) + ''' + ) + tdSql.execute( + f'''insert into {dbname}.t1 values ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ''' ) From ad0c91e6c3e29c48ec52d3b827d5972729db3916 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 19 Feb 2024 14:06:45 +0800 Subject: [PATCH 03/12] fix: invalid memory access during meta snapshot transfer --- source/dnode/vnode/src/meta/metaSnapshot.c | 41 +++++++++++----------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index 18190ac533..e86ed3b657 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -96,29 +96,29 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) { continue; } + if (!pData || !nData) { + metaError("meta/snap: invalide nData: %" PRId32 " meta snap read failed.", nData); + goto _exit; + } + + *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData); + if (*ppData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); + pHdr->type = SNAP_DATA_META; + pHdr->size = nData; + memcpy(pHdr->data, pData, nData); + + metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d", + TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData); + tdbTbcMoveToNext(pReader->pTbc); break; } - if (!pData || !nData) { - metaError("meta/snap: invalide nData: %" PRId32 " meta snap read failed.", nData); - goto _exit; - } - - *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData); - if (*ppData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); - pHdr->type = SNAP_DATA_META; - pHdr->size = nData; - memcpy(pHdr->data, pData, nData); - - metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d", - TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData); - _exit: return code; @@ -619,7 +619,8 @@ SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext* ctx) { int32_t ret = MoveToPosition(ctx, idInfo->version, *uidTmp); if (ret != 0) { - metaDebug("tmqsnap getMetaTableInfoFromSnapshot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp, idInfo->version); + metaDebug("tmqsnap getMetaTableInfoFromSnapshot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp, + idInfo->version); continue; } tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen); From 0e8170de49f74f1485d5bb99bc3dc311b2b77697 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 20 Feb 2024 10:17:47 +0800 Subject: [PATCH 04/12] remove invalid code --- include/libs/stream/tstream.h | 1 - source/libs/stream/src/streamCheckpoint.c | 12 ------------ 2 files changed, 13 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 11eb34557b..64ce735843 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -657,7 +657,6 @@ int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckp int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq); int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp); -int32_t tDecodeStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp); typedef struct { SMsgHead msgHead; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 4614cb0cee..607e31bfe6 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -68,18 +68,6 @@ int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckp return pEncoder->pos; } -int32_t tDecodeStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->nodeId) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->expireTime) < 0) return -1; - if (tDecodeI8(pDecoder, &pRsp->success) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; From 85738649ce6c8fb2989878d1d13cd36243a395d2 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 20 Feb 2024 11:14:28 +0800 Subject: [PATCH 05/12] fix(test/query): keep cache query consistent with none model --- tests/system-test/2-query/last_cache_scan.py | 50 ++++++++++++-------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/tests/system-test/2-query/last_cache_scan.py b/tests/system-test/2-query/last_cache_scan.py index b267cedc9d..049fe60f16 100644 --- a/tests/system-test/2-query/last_cache_scan.py +++ b/tests/system-test/2-query/last_cache_scan.py @@ -24,6 +24,11 @@ class TDTestCase: self.ctbNum = 10 self.rowsPerTbl = 10000 self.duraion = '1h' + self.cachemodel = 'both' + self.cacheEnable = True + #self.cacheEnable = False + if not self.cacheEnable: + self.cachemodel = 'none' def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -34,7 +39,7 @@ class TDTestCase: if dropFlag == 1: tsql.execute("drop database if exists %s"%(dbName)) - tsql.execute("create database if not exists %s vgroups %d replica %d duration %s CACHEMODEL 'both'"%(dbName, vgroups, replica, duration)) + tsql.execute("create database if not exists %s vgroups %d replica %d duration %s CACHEMODEL '%s'"%(dbName, vgroups, replica, duration, self.cachemodel)) tdLog.debug("complete to create database %s"%(dbName)) return @@ -130,6 +135,9 @@ class TDTestCase: return def check_explain_res_has_row(self, plan_str_expect: str, rows, sql): + if not self.cacheEnable: + return + plan_found = False for row in rows: if str(row).find(plan_str_expect) >= 0: @@ -343,10 +351,10 @@ class TDTestCase: p.check_returncode() tdSql.query_success_failed("select ts, last(c1), c1, ts, c1 from meters", queryTimes=10, expectErrInfo="Invalid column name: c1") tdSql.query('select last(c12), c12, ts from meters', queryTimes=1) - tdSql.checkRows(1) - tdSql.checkCols(3) - tdSql.checkData(0, 0, None) - tdSql.checkData(0, 1, None) + tdSql.checkRows(0) + #tdSql.checkCols(3) + #tdSql.checkData(0, 0, None) + #tdSql.checkData(0, 1, None) def test_cache_scan_with_drop_column(self): tdSql.query('select last(*) from meters') @@ -378,41 +386,41 @@ class TDTestCase: p.check_returncode() tdSql.query_success_failed("select ts, last(c2), c12, ts, c12 from meters", queryTimes=10, expectErrInfo="Invalid column name: c2") tdSql.query('select last(c1), c1, ts from meters', queryTimes=1) - tdSql.checkRows(1) - tdSql.checkCols(3) - tdSql.checkData(0, 0, None) - tdSql.checkData(0, 1, None) + tdSql.checkRows(0) + #tdSql.checkCols(3) + #tdSql.checkData(0, 0, None) + #tdSql.checkData(0, 1, None) def test_cache_scan_last_row_with_partition_by(self): tdSql.query('select last(c1) from meters partition by t1') print(str(tdSql.queryResult)) - tdSql.checkCols(1) - tdSql.checkRows(5) + #tdSql.checkCols(1) + tdSql.checkRows(0) p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c2 int"]) p.check_returncode() tdSql.query_success_failed('select last(c1) from meters partition by t1', queryTimes=10, expectErrInfo="Invalid column name: c1") tdSql.query('select last(c2), c2, ts from meters', queryTimes=1) print(str(tdSql.queryResult)) - tdSql.checkRows(1) - tdSql.checkCols(3) - tdSql.checkData(0, 0, None) - tdSql.checkData(0, 1, None) + tdSql.checkRows(0) + #tdSql.checkCols(3) + #tdSql.checkData(0, 0, None) + #tdSql.checkData(0, 1, None) def test_cache_scan_last_row_with_partition_by_tbname(self): tdSql.query('select last(c2) from meters partition by tbname') print(str(tdSql.queryResult)) - tdSql.checkCols(1) - tdSql.checkRows(10) + #tdSql.checkCols(1) + tdSql.checkRows(0) p = subprocess.run(["taos", '-s', "alter table test.meters drop column c2; alter table test.meters add column c1 int"]) p.check_returncode() tdSql.query_success_failed('select last_row(c2) from meters partition by tbname', queryTimes=10, expectErrInfo="Invalid column name: c2") tdSql.query('select last(c1), c1, ts from meters', queryTimes=1) print(str(tdSql.queryResult)) - tdSql.checkRows(1) - tdSql.checkCols(3) - tdSql.checkData(0, 0, None) - tdSql.checkData(0, 1, None) + tdSql.checkRows(0) + #tdSql.checkCols(3) + #tdSql.checkData(0, 0, None) + #tdSql.checkData(0, 1, None) From 6e48a014b446e15ab00004675a57e0cd76bc5eea Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 19 Feb 2024 19:25:21 -0800 Subject: [PATCH 06/12] fix: invalid read memory issue --- source/libs/catalog/src/ctgCache.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 1b693b4e07..f81f9a6954 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -2851,7 +2851,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe CTG_UNLOCK(CTG_READ, &pCache->metaLock); taosHashRelease(dbCache->tbCache, pCache); - ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName); + ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, pTableMeta->tableType, dbFName); res.pRes = pTableMeta; taosArrayPush(ctx->pResList, &res); @@ -2868,7 +2868,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe CTG_UNLOCK(CTG_READ, &pCache->metaLock); taosHashRelease(dbCache->tbCache, pCache); - ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName); + ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, pTableMeta->tableType, dbFName); res.pRes = pTableMeta; taosArrayPush(ctx->pResList, &res); From a36824c4a92492060640ced064173a8eefd49c04 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 20 Feb 2024 03:59:34 +0000 Subject: [PATCH 07/12] fix mem leak --- source/libs/transport/src/transCli.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 798a5bf54f..e2b69dd145 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -961,6 +961,10 @@ static void cliSendCb(uv_write_t* req, int status) { tTrace("%s conn %p send cost:%dus ", CONN_GET_INST_LABEL(pConn), pConn, (int)cost); } } + if (pMsg->msg.contLen == 0 && pMsg->msg.pCont != 0) { + rpcFreeCont(pMsg->msg.pCont); + pMsg->msg.pCont = 0; + } if (status == 0) { tDebug("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); From 22b1db5191ec6727717c9d544b6bb43b99dcd247 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 20 Feb 2024 14:49:52 +0800 Subject: [PATCH 08/12] fix:stream load error --- source/dnode/mnode/impl/src/mndStream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f8a79c8522..4e0c76f6de 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -152,7 +152,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { goto STREAM_DECODE_OVER; } - if (sver != MND_STREAM_VER_NUMBER) { + if (sver < 1 || sver > MND_STREAM_VER_NUMBER) { terrno = 0; mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER); goto STREAM_DECODE_OVER; From d258c3fca6d55e2eeb56c46c8a09ef4fbd79f9b7 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 20 Feb 2024 01:06:41 -0800 Subject: [PATCH 09/12] fix: windows crash issue --- source/libs/scheduler/test/schedulerTests.cpp | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index b73de83a82..c52a8599a0 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -60,6 +60,7 @@ extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t int64_t insertJobRefId = 0; int64_t queryJobRefId = 0; +bool schtJobDone = false; uint64_t schtMergeTemplateId = 0x4; uint64_t schtFetchTaskId = 0; uint64_t schtQueryId = 1; @@ -450,6 +451,8 @@ void *schtSendRsp(void *param) { schReleaseJob(job); + schtJobDone = true; + return NULL; } @@ -1028,6 +1031,8 @@ TEST(insertTest, normalCase) { TdThreadAttr thattr; taosThreadAttrInit(&thattr); + schtJobDone = false; + TdThread thread1; taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId); @@ -1045,6 +1050,14 @@ TEST(insertTest, normalCase) { code = schedulerExecJob(&req, &insertJobRefId); ASSERT_EQ(code, 0); + while (true) { + if (schtJobDone) { + break; + } + + taosUsleep(10000); + } + schedulerFreeJob(&insertJobRefId, 0); schedulerDestroy(); From efd2bc6bac24ff26f2c4746ae9b15e8cc367b039 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 20 Feb 2024 17:12:55 +0800 Subject: [PATCH 10/12] fix(tsdb/cache): remove unused block index cache --- source/dnode/vnode/src/tsdb/tsdbCache.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index b333c4c7d4..1ef2a451a7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -21,6 +21,7 @@ #define ROCKS_BATCH_SIZE (4096) +#if 0 static int32_t tsdbOpenBICache(STsdb *pTsdb) { int32_t code = 0; SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5); @@ -52,6 +53,7 @@ static void tsdbCloseBICache(STsdb *pTsdb) { taosThreadMutexDestroy(&pTsdb->biMutex); } } +#endif static int32_t tsdbOpenBCache(STsdb *pTsdb) { int32_t code = 0; @@ -1627,11 +1629,13 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { goto _err; } +#if 0 code = tsdbOpenBICache(pTsdb); if (code != TSDB_CODE_SUCCESS) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } +#endif code = tsdbOpenBCache(pTsdb); if (code != TSDB_CODE_SUCCESS) { @@ -1673,7 +1677,9 @@ void tsdbCloseCache(STsdb *pTsdb) { taosThreadMutexDestroy(&pTsdb->lruMutex); } +#if 0 tsdbCloseBICache(pTsdb); +#endif tsdbCloseBCache(pTsdb); tsdbClosePgCache(pTsdb); tsdbCloseRocksCache(pTsdb); @@ -3447,6 +3453,7 @@ int32_t tsdbCacheGetElems(SVnode *pVnode) { return elems; } +#if 0 static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) { struct { int32_t fid; @@ -3523,7 +3530,6 @@ int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHa return code; } -#ifdef BUILD_NO_CALL int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) { int32_t code = 0; From af1a40a7fffc7feb0b4b9586cfcd3e391933a1bf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 20 Feb 2024 12:09:30 +0000 Subject: [PATCH 11/12] fix duplicate uid --- source/libs/index/src/indexFilter.c | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index 7ed36fbf9e..cb42e60c01 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -17,6 +17,7 @@ #include "index.h" #include "indexComm.h" #include "indexInt.h" +#include "indexUtil.h" #include "nodes.h" #include "querynodes.h" #include "scalar.h" @@ -77,15 +78,15 @@ typedef struct SIFParam { char dbName[TSDB_DB_NAME_LEN]; char colName[TSDB_COL_NAME_LEN * 2 + 4]; - SIndexMetaArg arg; + SIndexMetaArg arg; SMetaDataFilterAPI api; } SIFParam; typedef struct SIFCtx { - int32_t code; - SHashObj *pRes; /* element is SIFParam */ - bool noExec; // true: just iterate condition tree, and add hint to executor plan - SIndexMetaArg arg; + int32_t code; + SHashObj *pRes; /* element is SIFParam */ + bool noExec; // true: just iterate condition tree, and add hint to executor plan + SIndexMetaArg arg; SMetaDataFilterAPI *pAPI; } SIFCtx; @@ -669,6 +670,10 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP if (sifSetFltParam(left, right, &typedata, ¶m) != 0) return -1; } ret = left->api.metaFilterTableIds(arg->metaEx, ¶m, output->result); + if (ret == 0) { + taosArraySort(output->result, uidCompare); + taosArrayRemoveDuplicate(output->result, uidCompare, NULL); + } } return ret; } @@ -875,8 +880,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou } else if (node->condType == LOGIC_COND_TYPE_NOT) { // taosArrayAddAll(output->result, params[m].result); } - taosArraySort(output->result, idxUidCompare); - taosArrayRemoveDuplicate(output->result, idxUidCompare, NULL); + taosArraySort(output->result, uidCompare); + taosArrayRemoveDuplicate(output->result, uidCompare, NULL); } } else { for (int32_t m = 0; m < node->pParameterList->length; m++) { @@ -1016,7 +1021,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) { return code; } -static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status, SMetaDataFilterAPI* pAPI) { +static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status, SMetaDataFilterAPI *pAPI) { int32_t code = TSDB_CODE_SUCCESS; if (pNode == NULL) { return TSDB_CODE_QRY_INVALID_INPUT; @@ -1054,7 +1059,8 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status, SMetaDataFilte return code; } -int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status, SMetaDataFilterAPI* pAPI) { +int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status, + SMetaDataFilterAPI *pAPI) { SIdxFltStatus st = idxGetFltStatus(pFilterNode, pAPI); if (st == SFLT_NOT_INDEX) { *status = st; @@ -1081,7 +1087,7 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, return TSDB_CODE_SUCCESS; } -SIdxFltStatus idxGetFltStatus(SNode *pFilterNode, SMetaDataFilterAPI* pAPI) { +SIdxFltStatus idxGetFltStatus(SNode *pFilterNode, SMetaDataFilterAPI *pAPI) { SIdxFltStatus st = SFLT_NOT_INDEX; if (pFilterNode == NULL) { return SFLT_NOT_INDEX; From 05c6c3815abe5173e1f0635904880f53ad44b3fc Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 20 Feb 2024 12:45:21 +0000 Subject: [PATCH 12/12] fix duplicate uid --- source/libs/index/src/index.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 3ed66956e8..f41b5525f3 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -459,7 +459,7 @@ static void idxInterRsltDestroy(SArray* results) { static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) { // refactor, merge interResults into fResults by oType - for (int i = 0; i < taosArrayGetSize(in); i--) { + for (int i = 0; i < taosArrayGetSize(in); i++) { SArray* t = taosArrayGetP(in, i); taosArraySort(t, uidCompare); taosArrayRemoveDuplicate(t, uidCompare, NULL);