enh(tsdb/cache): flag empty table when creating
This commit is contained in:
parent
086943ea5c
commit
d59b767af3
|
@ -214,6 +214,13 @@ int32_t tsdbBegin(STsdb* pTsdb);
|
||||||
// int32_t tsdbPrepareCommit(STsdb* pTsdb);
|
// int32_t tsdbPrepareCommit(STsdb* pTsdb);
|
||||||
// int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
|
// int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
|
||||||
int32_t tsdbCacheCommit(STsdb* pTsdb);
|
int32_t tsdbCacheCommit(STsdb* pTsdb);
|
||||||
|
int32_t tsdbCacheNewTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow);
|
||||||
|
int32_t tsdbCacheDropTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow);
|
||||||
|
int32_t tsdbCacheDropSubTables(STsdb* pTsdb, SArray* uids, tb_uid_t suid);
|
||||||
|
int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type);
|
||||||
|
int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type);
|
||||||
|
int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type);
|
||||||
|
int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type);
|
||||||
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
|
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
|
||||||
int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync);
|
int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync);
|
||||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
|
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
|
||||||
|
|
|
@ -303,6 +303,8 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb
|
||||||
|
|
||||||
tdbTbcClose(pCtbIdxc);
|
tdbTbcClose(pCtbIdxc);
|
||||||
|
|
||||||
|
(void)tsdbCacheDropSubTables(pMeta->pVnode->pTsdb, tbUidList, pReq->suid);
|
||||||
|
|
||||||
metaWLock(pMeta);
|
metaWLock(pMeta);
|
||||||
|
|
||||||
for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) {
|
for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) {
|
||||||
|
@ -334,6 +336,40 @@ _exit:
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void metaGetSubtables(SMeta *pMeta, int64_t suid, SArray *uids) {
|
||||||
|
if (!uids) return;
|
||||||
|
|
||||||
|
int c = 0;
|
||||||
|
void *pKey = NULL;
|
||||||
|
int nKey = 0;
|
||||||
|
TBC *pCtbIdxc = NULL;
|
||||||
|
|
||||||
|
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, NULL);
|
||||||
|
int rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
|
||||||
|
if (rc < 0) {
|
||||||
|
tdbTbcClose(pCtbIdxc);
|
||||||
|
metaWLock(pMeta);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
rc = tdbTbcNext(pCtbIdxc, &pKey, &nKey, NULL, NULL);
|
||||||
|
if (rc < 0) break;
|
||||||
|
|
||||||
|
if (((SCtbIdxKey *)pKey)->suid < suid) {
|
||||||
|
continue;
|
||||||
|
} else if (((SCtbIdxKey *)pKey)->suid > suid) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(uids, &(((SCtbIdxKey *)pKey)->uid));
|
||||||
|
}
|
||||||
|
|
||||||
|
tdbFree(pKey);
|
||||||
|
|
||||||
|
tdbTbcClose(pCtbIdxc);
|
||||||
|
}
|
||||||
|
|
||||||
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
SMetaEntry oStbEntry = {0};
|
SMetaEntry oStbEntry = {0};
|
||||||
SMetaEntry nStbEntry = {0};
|
SMetaEntry nStbEntry = {0};
|
||||||
|
@ -397,9 +433,39 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
nStbEntry.stbEntry.schemaRow = pReq->schemaRow;
|
nStbEntry.stbEntry.schemaRow = pReq->schemaRow;
|
||||||
nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
|
nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
|
||||||
|
|
||||||
int32_t deltaCol = pReq->schemaRow.nCols - oStbEntry.stbEntry.schemaRow.nCols;
|
int nCols = pReq->schemaRow.nCols;
|
||||||
|
int onCols = oStbEntry.stbEntry.schemaRow.nCols;
|
||||||
|
int32_t deltaCol = nCols - onCols;
|
||||||
bool updStat = deltaCol != 0 && !metaTbInFilterCache(pMeta, pReq->name, 1);
|
bool updStat = deltaCol != 0 && !metaTbInFilterCache(pMeta, pReq->name, 1);
|
||||||
|
|
||||||
|
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||||
|
STsdb *pTsdb = pMeta->pVnode->pTsdb;
|
||||||
|
SArray *uids = taosArrayInit(8, sizeof(int64_t));
|
||||||
|
if (deltaCol == 1) {
|
||||||
|
int16_t cid = pReq->schemaRow.pSchema[nCols - 1].colId;
|
||||||
|
int8_t col_type = pReq->schemaRow.pSchema[nCols - 1].type;
|
||||||
|
|
||||||
|
metaGetSubtables(pMeta, pReq->suid, uids);
|
||||||
|
tsdbCacheNewSTableColumn(pTsdb, uids, cid, col_type);
|
||||||
|
} else if (deltaCol == -1) {
|
||||||
|
int16_t cid = -1;
|
||||||
|
int8_t col_type = -1;
|
||||||
|
for (int i = 0, j = 0; i < nCols && j < onCols; ++i, ++j) {
|
||||||
|
if (pReq->schemaRow.pSchema[i].colId != oStbEntry.stbEntry.schemaRow.pSchema[j].colId) {
|
||||||
|
cid = oStbEntry.stbEntry.schemaRow.pSchema[j].colId;
|
||||||
|
col_type = oStbEntry.stbEntry.schemaRow.pSchema[j].type;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cid != -1) {
|
||||||
|
metaGetSubtables(pMeta, pReq->suid, uids);
|
||||||
|
tsdbCacheDropSTableColumn(pTsdb, uids, cid, col_type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (uids) taosArrayDestroy(uids);
|
||||||
|
}
|
||||||
|
|
||||||
metaWLock(pMeta);
|
metaWLock(pMeta);
|
||||||
// compare two entry
|
// compare two entry
|
||||||
if (oStbEntry.stbEntry.schemaRow.version != pReq->schemaRow.version) {
|
if (oStbEntry.stbEntry.schemaRow.version != pReq->schemaRow.version) {
|
||||||
|
@ -822,6 +888,10 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
|
||||||
metaUidCacheClear(pMeta, me.ctbEntry.suid);
|
metaUidCacheClear(pMeta, me.ctbEntry.suid);
|
||||||
metaTbGroupCacheClear(pMeta, me.ctbEntry.suid);
|
metaTbGroupCacheClear(pMeta, me.ctbEntry.suid);
|
||||||
metaULock(pMeta);
|
metaULock(pMeta);
|
||||||
|
|
||||||
|
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||||
|
tsdbCacheNewTable(pMeta->pVnode->pTsdb, me.uid, me.ctbEntry.suid, NULL);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
me.ntbEntry.btime = pReq->btime;
|
me.ntbEntry.btime = pReq->btime;
|
||||||
me.ntbEntry.ttlDays = pReq->ttl;
|
me.ntbEntry.ttlDays = pReq->ttl;
|
||||||
|
@ -832,6 +902,10 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
|
||||||
|
|
||||||
++pStats->numOfNTables;
|
++pStats->numOfNTables;
|
||||||
pStats->numOfNTimeSeries += me.ntbEntry.schemaRow.nCols - 1;
|
pStats->numOfNTimeSeries += me.ntbEntry.schemaRow.nCols - 1;
|
||||||
|
|
||||||
|
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||||
|
tsdbCacheNewTable(pMeta->pVnode->pTsdb, me.uid, -1, &me.ntbEntry.schemaRow);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
|
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
|
||||||
|
@ -896,6 +970,10 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
|
||||||
|
|
||||||
if ((type == TSDB_CHILD_TABLE || type == TSDB_NORMAL_TABLE) && tbUids) {
|
if ((type == TSDB_CHILD_TABLE || type == TSDB_NORMAL_TABLE) && tbUids) {
|
||||||
taosArrayPush(tbUids, &uid);
|
taosArrayPush(tbUids, &uid);
|
||||||
|
|
||||||
|
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||||
|
tsdbCacheDropTable(pMeta->pVnode->pTsdb, uid, suid, NULL);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((type == TSDB_CHILD_TABLE) && tbUid) {
|
if ((type == TSDB_CHILD_TABLE) && tbUid) {
|
||||||
|
@ -930,6 +1008,11 @@ void metaDropTables(SMeta *pMeta, SArray *tbUids) {
|
||||||
}
|
}
|
||||||
tSimpleHashPut(suidHash, &suid, sizeof(tb_uid_t), &nCtbDropped, sizeof(int64_t));
|
tSimpleHashPut(suidHash, &suid, sizeof(tb_uid_t), &nCtbDropped, sizeof(int64_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||||
|
tsdbCacheDropTable(pMeta->pVnode->pTsdb, uid, suid, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
metaDebug("batch drop table:%" PRId64, uid);
|
metaDebug("batch drop table:%" PRId64, uid);
|
||||||
}
|
}
|
||||||
metaULock(pMeta);
|
metaULock(pMeta);
|
||||||
|
@ -1172,11 +1255,22 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p
|
||||||
metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1, 0);
|
metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1, 0);
|
||||||
metaUidCacheClear(pMeta, e.ctbEntry.suid);
|
metaUidCacheClear(pMeta, e.ctbEntry.suid);
|
||||||
metaTbGroupCacheClear(pMeta, e.ctbEntry.suid);
|
metaTbGroupCacheClear(pMeta, e.ctbEntry.suid);
|
||||||
|
/*
|
||||||
|
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||||
|
tsdbCacheDropTable(pMeta->pVnode->pTsdb, e.uid, e.ctbEntry.suid, NULL);
|
||||||
|
}
|
||||||
|
*/
|
||||||
} else if (e.type == TSDB_NORMAL_TABLE) {
|
} else if (e.type == TSDB_NORMAL_TABLE) {
|
||||||
// drop schema.db (todo)
|
// drop schema.db (todo)
|
||||||
|
|
||||||
--pMeta->pVnode->config.vndStats.numOfNTables;
|
--pMeta->pVnode->config.vndStats.numOfNTables;
|
||||||
pMeta->pVnode->config.vndStats.numOfNTimeSeries -= e.ntbEntry.schemaRow.nCols - 1;
|
pMeta->pVnode->config.vndStats.numOfNTimeSeries -= e.ntbEntry.schemaRow.nCols - 1;
|
||||||
|
|
||||||
|
/*
|
||||||
|
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||||
|
tsdbCacheDropTable(pMeta->pVnode->pTsdb, e.uid, -1, &e.ntbEntry.schemaRow);
|
||||||
|
}
|
||||||
|
*/
|
||||||
} else if (e.type == TSDB_SUPER_TABLE) {
|
} else if (e.type == TSDB_SUPER_TABLE) {
|
||||||
tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), pMeta->txn);
|
tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), pMeta->txn);
|
||||||
// drop schema.db (todo)
|
// drop schema.db (todo)
|
||||||
|
@ -1364,6 +1458,12 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
||||||
|
|
||||||
++pMeta->pVnode->config.vndStats.numOfNTimeSeries;
|
++pMeta->pVnode->config.vndStats.numOfNTimeSeries;
|
||||||
metaTimeSeriesNotifyCheck(pMeta);
|
metaTimeSeriesNotifyCheck(pMeta);
|
||||||
|
|
||||||
|
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||||
|
int16_t cid = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId;
|
||||||
|
int8_t col_type = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type;
|
||||||
|
(void)tsdbCacheNewNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
||||||
if (pColumn == NULL) {
|
if (pColumn == NULL) {
|
||||||
|
@ -1386,6 +1486,13 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
||||||
pSchema->nCols--;
|
pSchema->nCols--;
|
||||||
|
|
||||||
--pMeta->pVnode->config.vndStats.numOfNTimeSeries;
|
--pMeta->pVnode->config.vndStats.numOfNTimeSeries;
|
||||||
|
|
||||||
|
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||||
|
int16_t cid = pColumn->colId;
|
||||||
|
int8_t col_type = pColumn->type;
|
||||||
|
|
||||||
|
(void)tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
||||||
if (pColumn == NULL) {
|
if (pColumn == NULL) {
|
||||||
|
|
|
@ -431,25 +431,6 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) {
|
|
||||||
SLastCol *pLastCol = NULL;
|
|
||||||
|
|
||||||
char *err = NULL;
|
|
||||||
size_t vlen = 0;
|
|
||||||
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
|
|
||||||
size_t klen = ROCKS_KEY_LEN;
|
|
||||||
char *value = NULL;
|
|
||||||
value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err);
|
|
||||||
if (NULL != err) {
|
|
||||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
|
||||||
rocksdb_free(err);
|
|
||||||
}
|
|
||||||
|
|
||||||
pLastCol = tsdbCacheDeserialize(value);
|
|
||||||
|
|
||||||
return pLastCol;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void reallocVarData(SColVal *pColVal) {
|
static void reallocVarData(SColVal *pColVal) {
|
||||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||||
uint8_t *pVal = pColVal->value.pData;
|
uint8_t *pVal = pColVal->value.pData;
|
||||||
|
@ -476,6 +457,355 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud
|
||||||
taosMemoryFree(value);
|
taosMemoryFree(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t ltype) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SLRUCache *pCache = pTsdb->lruCache;
|
||||||
|
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||||
|
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1};
|
||||||
|
SLastCol *pLastCol = &noneCol;
|
||||||
|
|
||||||
|
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||||
|
*pTmpLastCol = *pLastCol;
|
||||||
|
pLastCol = pTmpLastCol;
|
||||||
|
|
||||||
|
reallocVarData(&pLastCol->colVal);
|
||||||
|
size_t charge = sizeof(*pLastCol);
|
||||||
|
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
|
||||||
|
charge += pLastCol->colVal.value.nData;
|
||||||
|
}
|
||||||
|
|
||||||
|
SLastKey *pLastKey = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
|
||||||
|
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
|
||||||
|
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
|
||||||
|
if (status != TAOS_LRU_STATUS_OK) {
|
||||||
|
code = -1;
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
// store result back to rocks cache
|
||||||
|
char *value = NULL;
|
||||||
|
size_t vlen = 0;
|
||||||
|
tsdbCacheSerialize(pLastCol, &value, &vlen);
|
||||||
|
|
||||||
|
SLastKey *key = pLastKey;
|
||||||
|
size_t klen = ROCKS_KEY_LEN;
|
||||||
|
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
|
||||||
|
taosMemoryFree(value);
|
||||||
|
*/
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
|
||||||
|
int32_t code = 0;
|
||||||
|
char *err = NULL;
|
||||||
|
|
||||||
|
SLRUCache *pCache = pTsdb->lruCache;
|
||||||
|
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||||
|
|
||||||
|
taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState);
|
||||||
|
|
||||||
|
rocksMayWrite(pTsdb, true, false, false);
|
||||||
|
rocksMayWrite(pTsdb, true, true, false);
|
||||||
|
rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
|
||||||
|
|
||||||
|
if (NULL != err) {
|
||||||
|
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||||
|
rocksdb_free(err);
|
||||||
|
code = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t ltype) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
// build keys & multi get from rocks
|
||||||
|
char **keys_list = taosMemoryCalloc(2, sizeof(char *));
|
||||||
|
size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
|
||||||
|
const size_t klen = ROCKS_KEY_LEN;
|
||||||
|
|
||||||
|
char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
|
||||||
|
((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid};
|
||||||
|
((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid};
|
||||||
|
|
||||||
|
keys_list[0] = keys;
|
||||||
|
keys_list[1] = keys + sizeof(SLastKey);
|
||||||
|
keys_list_sizes[0] = klen;
|
||||||
|
keys_list_sizes[1] = klen;
|
||||||
|
|
||||||
|
char **values_list = taosMemoryCalloc(2, sizeof(char *));
|
||||||
|
size_t *values_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
|
||||||
|
char **errs = taosMemoryCalloc(2, sizeof(char *));
|
||||||
|
|
||||||
|
// rocksMayWrite(pTsdb, true, false, false);
|
||||||
|
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, 2, (const char *const *)keys_list, keys_list_sizes,
|
||||||
|
values_list, values_list_sizes, errs);
|
||||||
|
|
||||||
|
for (int i = 0; i < 2; ++i) {
|
||||||
|
if (errs[i]) {
|
||||||
|
rocksdb_free(errs[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosMemoryFree(errs);
|
||||||
|
|
||||||
|
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||||
|
{
|
||||||
|
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[0]);
|
||||||
|
if (NULL != pLastCol) {
|
||||||
|
rocksdb_writebatch_delete(wb, keys_list[0], klen);
|
||||||
|
}
|
||||||
|
pLastCol = tsdbCacheDeserialize(values_list[1]);
|
||||||
|
if (NULL != pLastCol) {
|
||||||
|
rocksdb_writebatch_delete(wb, keys_list[1], klen);
|
||||||
|
}
|
||||||
|
|
||||||
|
rocksdb_free(values_list[0]);
|
||||||
|
rocksdb_free(values_list[1]);
|
||||||
|
|
||||||
|
bool erase = false;
|
||||||
|
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[0], klen);
|
||||||
|
if (h) {
|
||||||
|
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
|
||||||
|
erase = true;
|
||||||
|
|
||||||
|
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
|
||||||
|
}
|
||||||
|
if (erase) {
|
||||||
|
taosLRUCacheErase(pTsdb->lruCache, keys_list[0], klen);
|
||||||
|
}
|
||||||
|
|
||||||
|
erase = false;
|
||||||
|
h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[1], klen);
|
||||||
|
if (h) {
|
||||||
|
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
|
||||||
|
erase = true;
|
||||||
|
|
||||||
|
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
|
||||||
|
}
|
||||||
|
if (erase) {
|
||||||
|
taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(keys_list[0]);
|
||||||
|
|
||||||
|
taosMemoryFree(keys_list);
|
||||||
|
taosMemoryFree(keys_list_sizes);
|
||||||
|
taosMemoryFree(values_list);
|
||||||
|
taosMemoryFree(values_list_sizes);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
|
if (suid < 0) {
|
||||||
|
int nCols = pSchemaRow->nCols;
|
||||||
|
for (int i = 0; i < nCols; ++i) {
|
||||||
|
int16_t cid = pSchemaRow->pSchema[i].colId;
|
||||||
|
int8_t col_type = pSchemaRow->pSchema[i].type;
|
||||||
|
|
||||||
|
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||||
|
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
STSchema *pTSchema = NULL;
|
||||||
|
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int nCols = pTSchema->numOfCols;
|
||||||
|
for (int i = 0; i < nCols; ++i) {
|
||||||
|
int16_t cid = pTSchema->columns[i].colId;
|
||||||
|
int8_t col_type = pTSchema->columns[i].type;
|
||||||
|
|
||||||
|
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||||
|
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pTSchema);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
|
(void)tsdbCacheCommitNoLock(pTsdb);
|
||||||
|
|
||||||
|
if (suid < 0) {
|
||||||
|
int nCols = pSchemaRow->nCols;
|
||||||
|
for (int i = 0; i < nCols; ++i) {
|
||||||
|
int16_t cid = pSchemaRow->pSchema[i].colId;
|
||||||
|
int8_t col_type = pSchemaRow->pSchema[i].type;
|
||||||
|
|
||||||
|
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||||
|
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
STSchema *pTSchema = NULL;
|
||||||
|
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int nCols = pTSchema->numOfCols;
|
||||||
|
for (int i = 0; i < nCols; ++i) {
|
||||||
|
int16_t cid = pTSchema->columns[i].colId;
|
||||||
|
int8_t col_type = pTSchema->columns[i].type;
|
||||||
|
|
||||||
|
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||||
|
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pTSchema);
|
||||||
|
}
|
||||||
|
|
||||||
|
rocksMayWrite(pTsdb, true, false, false);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
|
(void)tsdbCacheCommitNoLock(pTsdb);
|
||||||
|
|
||||||
|
STSchema *pTSchema = NULL;
|
||||||
|
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
|
||||||
|
int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
|
||||||
|
|
||||||
|
int nCols = pTSchema->numOfCols;
|
||||||
|
for (int i = 0; i < nCols; ++i) {
|
||||||
|
int16_t cid = pTSchema->columns[i].colId;
|
||||||
|
int8_t col_type = pTSchema->columns[i].type;
|
||||||
|
|
||||||
|
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||||
|
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pTSchema);
|
||||||
|
|
||||||
|
rocksMayWrite(pTsdb, true, false, false);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
|
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||||
|
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||||
|
|
||||||
|
// rocksMayWrite(pTsdb, true, false, false);
|
||||||
|
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||||
|
//(void)tsdbCacheCommit(pTsdb);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
|
(void)tsdbCacheCommitNoLock(pTsdb);
|
||||||
|
|
||||||
|
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||||
|
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||||
|
|
||||||
|
rocksMayWrite(pTsdb, true, false, true);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
|
for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
|
||||||
|
tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
|
||||||
|
|
||||||
|
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||||
|
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// rocksMayWrite(pTsdb, true, false, false);
|
||||||
|
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||||
|
//(void)tsdbCacheCommit(pTsdb);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
|
(void)tsdbCacheCommitNoLock(pTsdb);
|
||||||
|
|
||||||
|
for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
|
||||||
|
int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
|
||||||
|
|
||||||
|
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||||
|
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
rocksMayWrite(pTsdb, true, false, true);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) {
|
||||||
|
SLastCol *pLastCol = NULL;
|
||||||
|
|
||||||
|
char *err = NULL;
|
||||||
|
size_t vlen = 0;
|
||||||
|
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
|
||||||
|
size_t klen = ROCKS_KEY_LEN;
|
||||||
|
char *value = NULL;
|
||||||
|
value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err);
|
||||||
|
if (NULL != err) {
|
||||||
|
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||||
|
rocksdb_free(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
pLastCol = tsdbCacheDeserialize(value);
|
||||||
|
|
||||||
|
return pLastCol;
|
||||||
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int idx;
|
int idx;
|
||||||
SLastKey key;
|
SLastKey key;
|
||||||
|
|
Loading…
Reference in New Issue