feat: pk insert/drop last cache
This commit is contained in:
parent
af39260ec7
commit
b649a73a19
|
@ -895,11 +895,17 @@ typedef enum {
|
||||||
} EExecMode;
|
} EExecMode;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
TSKEY ts;
|
SRowKey rowKey;
|
||||||
int8_t dirty;
|
int8_t dirty;
|
||||||
SColVal colVal;
|
SColVal colVal;
|
||||||
} SLastCol;
|
} SLastCol;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
TSKEY ts;
|
||||||
|
int8_t dirty;
|
||||||
|
SColVal colVal;
|
||||||
|
} SLastColV1;
|
||||||
|
|
||||||
int32_t tsdbOpenCache(STsdb *pTsdb);
|
int32_t tsdbOpenCache(STsdb *pTsdb);
|
||||||
void tsdbCloseCache(STsdb *pTsdb);
|
void tsdbCloseCache(STsdb *pTsdb);
|
||||||
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);
|
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);
|
||||||
|
|
|
@ -234,9 +234,9 @@ int32_t tsdbCacheNewTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapp
|
||||||
int32_t tsdbCacheDropTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow);
|
int32_t tsdbCacheDropTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow);
|
||||||
int32_t tsdbCacheDropSubTables(STsdb* pTsdb, SArray* uids, tb_uid_t suid);
|
int32_t tsdbCacheDropSubTables(STsdb* pTsdb, SArray* uids, tb_uid_t suid);
|
||||||
int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type);
|
int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type);
|
||||||
int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type);
|
int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, bool hasPrimayKey);
|
||||||
int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type);
|
int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type);
|
||||||
int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type);
|
int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey);
|
||||||
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
|
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
|
||||||
int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync);
|
int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync);
|
||||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
|
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
|
||||||
|
|
|
@ -449,18 +449,20 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
tsdbCacheNewSTableColumn(pTsdb, uids, cid, col_type);
|
tsdbCacheNewSTableColumn(pTsdb, uids, cid, col_type);
|
||||||
} else if (deltaCol == -1) {
|
} else if (deltaCol == -1) {
|
||||||
int16_t cid = -1;
|
int16_t cid = -1;
|
||||||
int8_t col_type = -1;
|
bool hasPrimaryKey = false;
|
||||||
|
if (onCols >= 2) {
|
||||||
|
hasPrimaryKey = (oStbEntry.stbEntry.schemaRow.pSchema[1].flags & COL_IS_KEY) ? true : false;
|
||||||
|
}
|
||||||
for (int i = 0, j = 0; i < nCols && j < onCols; ++i, ++j) {
|
for (int i = 0, j = 0; i < nCols && j < onCols; ++i, ++j) {
|
||||||
if (pReq->schemaRow.pSchema[i].colId != oStbEntry.stbEntry.schemaRow.pSchema[j].colId) {
|
if (pReq->schemaRow.pSchema[i].colId != oStbEntry.stbEntry.schemaRow.pSchema[j].colId) {
|
||||||
cid = oStbEntry.stbEntry.schemaRow.pSchema[j].colId;
|
cid = oStbEntry.stbEntry.schemaRow.pSchema[j].colId;
|
||||||
col_type = oStbEntry.stbEntry.schemaRow.pSchema[j].type;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cid != -1) {
|
if (cid != -1) {
|
||||||
metaGetSubtables(pMeta, pReq->suid, uids);
|
metaGetSubtables(pMeta, pReq->suid, uids);
|
||||||
tsdbCacheDropSTableColumn(pTsdb, uids, cid, col_type);
|
tsdbCacheDropSTableColumn(pTsdb, uids, cid, hasPrimaryKey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (uids) taosArrayDestroy(uids);
|
if (uids) taosArrayDestroy(uids);
|
||||||
|
@ -1478,6 +1480,11 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
||||||
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
|
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
bool hasPrimayKey = false;
|
||||||
|
if (pSchema->nCols >= 2) {
|
||||||
|
hasPrimayKey = pSchema->pSchema[1].flags & COL_IS_KEY ? true : false;
|
||||||
|
}
|
||||||
|
|
||||||
pSchema->version++;
|
pSchema->version++;
|
||||||
tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema);
|
tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema);
|
||||||
if (tlen) {
|
if (tlen) {
|
||||||
|
@ -1489,9 +1496,8 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
||||||
|
|
||||||
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||||
int16_t cid = pColumn->colId;
|
int16_t cid = pColumn->colId;
|
||||||
int8_t col_type = pColumn->type;
|
|
||||||
|
|
||||||
(void)tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type);
|
(void)tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, hasPrimayKey);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
||||||
|
|
|
@ -127,12 +127,25 @@ static void tsdbClosePgCache(STsdb *pTsdb) {
|
||||||
|
|
||||||
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
|
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
|
||||||
|
|
||||||
|
enum {
|
||||||
|
LFLAG_LAST_ROW = 0,
|
||||||
|
LFLAG_LAST = 1,
|
||||||
|
LFLAG_PRIMARY_KEY = (1 << 4),
|
||||||
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
tb_uid_t uid;
|
tb_uid_t uid;
|
||||||
int16_t cid;
|
int16_t cid;
|
||||||
int8_t ltype;
|
int8_t lflag;
|
||||||
} SLastKey;
|
} SLastKey;
|
||||||
|
|
||||||
|
#define LAST_COL_VERSION_BASE (((int64_t)(0x1)) << 63)
|
||||||
|
#define LAST_COL_VERSION (LAST_COL_VERSION_BASE + 2)
|
||||||
|
|
||||||
|
#define HAS_PRIMARY_KEY(k) (((k).lflag & LFLAG_PRIMARY_KEY) == LFLAG_PRIMARY_KEY)
|
||||||
|
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
|
||||||
|
#define IS_LAST_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
|
||||||
|
|
||||||
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
|
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
|
||||||
SVnode *pVnode = pTsdb->pVnode;
|
SVnode *pVnode = pTsdb->pVnode;
|
||||||
vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
|
vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
|
||||||
|
@ -167,9 +180,9 @@ static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lhs->ltype < rhs->ltype) {
|
if (lhs->lflag < rhs->lflag) {
|
||||||
return -1;
|
return -1;
|
||||||
} else if (lhs->ltype > rhs->ltype) {
|
} else if (lhs->lflag > rhs->lflag) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -322,16 +335,62 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SLastCol *tsdbCacheDeserialize(char const *value) {
|
// note: new object do not own colVal's resource, just copy the pointer
|
||||||
|
static SLastCol *tsdbCacheConvertLastColV1(SLastColV1 *pLastColV1) {
|
||||||
|
SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||||
|
if (pLastCol == NULL) return NULL;
|
||||||
|
pLastCol->rowKey.ts = pLastColV1->ts;
|
||||||
|
pLastCol->rowKey.numOfPKs = 0;
|
||||||
|
pLastCol->dirty = pLastColV1->dirty;
|
||||||
|
pLastCol->colVal = pLastColV1->colVal;
|
||||||
|
|
||||||
|
return pLastCol;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SLastCol *tsdbCacheDeserializeV1(char const *value) {
|
||||||
if (!value) {
|
if (!value) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SLastCol *pLastCol = (SLastCol *)value;
|
SLastColV1 *pLastColV1 = (SLastColV1 *)value;
|
||||||
|
SColVal *pColVal = &pLastColV1->colVal;
|
||||||
|
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
||||||
|
if (pColVal->value.nData > 0) {
|
||||||
|
pColVal->value.pData = (char *)value + sizeof(*pLastColV1);
|
||||||
|
} else {
|
||||||
|
pColVal->value.pData = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return tsdbCacheConvertLastColV1(pLastColV1);
|
||||||
|
}
|
||||||
|
|
||||||
|
static SLastCol *tsdbCacheDeserializeV2(char const *value) {
|
||||||
|
if (!value) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SLastCol *pLastCol = taosMemoryMalloc(sizeof(SLastCol));
|
||||||
|
*pLastCol = *(SLastCol *)(value);
|
||||||
|
|
||||||
|
char* currentPos = (char *)value + sizeof(*pLastCol);
|
||||||
|
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
||||||
|
SValue* pValue = &pLastCol->rowKey.pks[i];
|
||||||
|
if (IS_VAR_DATA_TYPE(pValue->type)) {
|
||||||
|
if (pValue->nData > 0) {
|
||||||
|
pValue->pData = currentPos;
|
||||||
|
currentPos += pValue->nData;
|
||||||
|
} else {
|
||||||
|
pValue->pData = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SColVal *pColVal = &pLastCol->colVal;
|
SColVal *pColVal = &pLastCol->colVal;
|
||||||
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
||||||
if (pColVal->value.nData > 0) {
|
if (pColVal->value.nData > 0) {
|
||||||
pColVal->value.pData = (char *)value + sizeof(*pLastCol);
|
pColVal->value.pData = currentPos;
|
||||||
|
currentPos += pColVal->value.nData;
|
||||||
} else {
|
} else {
|
||||||
pColVal->value.pData = NULL;
|
pColVal->value.pData = NULL;
|
||||||
}
|
}
|
||||||
|
@ -340,25 +399,68 @@ static SLastCol *tsdbCacheDeserialize(char const *value) {
|
||||||
return pLastCol;
|
return pLastCol;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SLastCol *tsdbCacheDeserialize(char const *value) {
|
||||||
|
if (!value) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool hasVersion = ((*(int64_t *)value) & LAST_COL_VERSION_BASE) == LAST_COL_VERSION_BASE;
|
||||||
|
if (!hasVersion) {
|
||||||
|
return tsdbCacheDeserializeV1(value);
|
||||||
|
}
|
||||||
|
return tsdbCacheDeserializeV2(value + sizeof(int64_t));
|
||||||
|
}
|
||||||
|
|
||||||
|
static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) {
|
||||||
|
ASSERT(from->nData >= 0);
|
||||||
|
if (from->nData > 0) {
|
||||||
|
memcpy(to->pData, from->pData, from->nData);
|
||||||
|
}
|
||||||
|
to->type = from->type;
|
||||||
|
to->nData = from->nData;
|
||||||
|
return from->nData;
|
||||||
|
}
|
||||||
|
|
||||||
static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
|
static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
|
||||||
SColVal *pColVal = &pLastCol->colVal;
|
SColVal *pColVal = &pLastCol->colVal;
|
||||||
size_t length = sizeof(*pLastCol);
|
size_t length = sizeof(int64_t) + sizeof(*pLastCol);
|
||||||
|
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
||||||
|
if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
|
||||||
|
length += pLastCol->rowKey.pks[i].nData;
|
||||||
|
}
|
||||||
|
}
|
||||||
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
||||||
length += pColVal->value.nData;
|
length += pColVal->value.nData;
|
||||||
}
|
}
|
||||||
*value = taosMemoryMalloc(length);
|
|
||||||
|
|
||||||
*(SLastCol *)(*value) = *pLastCol;
|
// set version
|
||||||
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
*value = taosMemoryMalloc(length);
|
||||||
uint8_t *pVal = pColVal->value.pData;
|
char *currentPos = *value;
|
||||||
SColVal *pDColVal = &((SLastCol *)(*value))->colVal;
|
*(int64_t *)currentPos = LAST_COL_VERSION;
|
||||||
pDColVal->value.pData = *value + sizeof(*pLastCol);
|
currentPos += sizeof(int64_t);
|
||||||
if (pColVal->value.nData > 0) {
|
|
||||||
memcpy(pDColVal->value.pData, pVal, pColVal->value.nData);
|
// copy last col
|
||||||
} else {
|
SLastCol* pToLastCol = (SLastCol *)currentPos;
|
||||||
pDColVal->value.pData = NULL;
|
*pToLastCol = *pLastCol;
|
||||||
|
currentPos += sizeof(*pLastCol);
|
||||||
|
|
||||||
|
// copy var data pks
|
||||||
|
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
||||||
|
SValue *pFromValue = &pLastCol->rowKey.pks[i];
|
||||||
|
if (IS_VAR_DATA_TYPE(pFromValue->type)) {
|
||||||
|
SValue *pToValue = &pToLastCol->rowKey.pks[i];
|
||||||
|
pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos;
|
||||||
|
currentPos += tsdbCacheCopyVarData(pFromValue, pToValue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// copy var data value
|
||||||
|
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
||||||
|
SValue *pFromValue = &pColVal->value;
|
||||||
|
SValue *pToValue = &pToLastCol->colVal.value;
|
||||||
|
pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos;
|
||||||
|
currentPos += tsdbCacheCopyVarData(pFromValue, pToValue);
|
||||||
|
}
|
||||||
*size = length;
|
*size = length;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -459,12 +561,15 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud
|
||||||
taosMemoryFree(value);
|
taosMemoryFree(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t ltype) {
|
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
SLRUCache *pCache = pTsdb->lruCache;
|
SLRUCache *pCache = pTsdb->lruCache;
|
||||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||||
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1};
|
SRowKey noneRowKey = {0};
|
||||||
|
noneRowKey.ts = TSKEY_MIN;
|
||||||
|
noneRowKey.numOfPKs = 0;
|
||||||
|
SLastCol noneCol = {.rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1};
|
||||||
SLastCol *pLastCol = &noneCol;
|
SLastCol *pLastCol = &noneCol;
|
||||||
|
|
||||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||||
|
@ -477,7 +582,7 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
|
||||||
charge += pLastCol->colVal.value.nData;
|
charge += pLastCol->colVal.value.nData;
|
||||||
}
|
}
|
||||||
|
|
||||||
SLastKey *pLastKey = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
|
SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
|
||||||
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
|
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
|
||||||
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
|
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
|
||||||
if (status != TAOS_LRU_STATUS_OK) {
|
if (status != TAOS_LRU_STATUS_OK) {
|
||||||
|
@ -519,7 +624,7 @@ int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t ltype) {
|
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
// build keys & multi get from rocks
|
// build keys & multi get from rocks
|
||||||
|
@ -527,9 +632,11 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
|
||||||
size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
|
size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
|
||||||
const size_t klen = ROCKS_KEY_LEN;
|
const size_t klen = ROCKS_KEY_LEN;
|
||||||
|
|
||||||
|
int8_t lflag = hasPrimaryKey ? LFLAG_PRIMARY_KEY : 0;
|
||||||
|
|
||||||
char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
|
char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
|
||||||
((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid};
|
((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST, .uid = uid, .cid = cid};
|
||||||
((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid};
|
((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid};
|
||||||
|
|
||||||
keys_list[0] = keys;
|
keys_list[0] = keys;
|
||||||
keys_list[1] = keys + sizeof(SLastKey);
|
keys_list[1] = keys + sizeof(SLastKey);
|
||||||
|
@ -588,6 +695,8 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
|
||||||
if (erase) {
|
if (erase) {
|
||||||
taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen);
|
taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pLastCol);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(keys_list[0]);
|
taosMemoryFree(keys_list[0]);
|
||||||
|
@ -606,13 +715,18 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap
|
||||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
if (suid < 0) {
|
if (suid < 0) {
|
||||||
|
int8_t lflag = 0;
|
||||||
int nCols = pSchemaRow->nCols;
|
int nCols = pSchemaRow->nCols;
|
||||||
|
if (nCols >= 2) {
|
||||||
|
lflag = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0;
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < nCols; ++i) {
|
for (int i = 0; i < nCols; ++i) {
|
||||||
int16_t cid = pSchemaRow->pSchema[i].colId;
|
int16_t cid = pSchemaRow->pSchema[i].colId;
|
||||||
int8_t col_type = pSchemaRow->pSchema[i].type;
|
int8_t col_type = pSchemaRow->pSchema[i].type;
|
||||||
|
|
||||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
|
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST_ROW);
|
||||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
|
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
STSchema *pTSchema = NULL;
|
STSchema *pTSchema = NULL;
|
||||||
|
@ -622,13 +736,18 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int8_t lflag = 0;
|
||||||
int nCols = pTSchema->numOfCols;
|
int nCols = pTSchema->numOfCols;
|
||||||
|
if (nCols >= 2) {
|
||||||
|
lflag = (pTSchema->columns[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0;
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < nCols; ++i) {
|
for (int i = 0; i < nCols; ++i) {
|
||||||
int16_t cid = pTSchema->columns[i].colId;
|
int16_t cid = pTSchema->columns[i].colId;
|
||||||
int8_t col_type = pTSchema->columns[i].type;
|
int8_t col_type = pTSchema->columns[i].type;
|
||||||
|
|
||||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
|
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST_ROW);
|
||||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
|
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pTSchema);
|
taosMemoryFree(pTSchema);
|
||||||
|
@ -646,14 +765,17 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra
|
||||||
|
|
||||||
(void)tsdbCacheCommitNoLock(pTsdb);
|
(void)tsdbCacheCommitNoLock(pTsdb);
|
||||||
|
|
||||||
if (suid < 0) {
|
if (pSchemaRow != NULL) {
|
||||||
|
bool hasPrimayKey = false;
|
||||||
int nCols = pSchemaRow->nCols;
|
int nCols = pSchemaRow->nCols;
|
||||||
|
if (nCols >= 2) {
|
||||||
|
hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false;
|
||||||
|
}
|
||||||
for (int i = 0; i < nCols; ++i) {
|
for (int i = 0; i < nCols; ++i) {
|
||||||
int16_t cid = pSchemaRow->pSchema[i].colId;
|
int16_t cid = pSchemaRow->pSchema[i].colId;
|
||||||
int8_t col_type = pSchemaRow->pSchema[i].type;
|
int8_t col_type = pSchemaRow->pSchema[i].type;
|
||||||
|
|
||||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0);
|
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
STSchema *pTSchema = NULL;
|
STSchema *pTSchema = NULL;
|
||||||
|
@ -663,13 +785,16 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool hasPrimayKey = false;
|
||||||
int nCols = pTSchema->numOfCols;
|
int nCols = pTSchema->numOfCols;
|
||||||
|
if (nCols >= 2) {
|
||||||
|
hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
|
||||||
|
}
|
||||||
for (int i = 0; i < nCols; ++i) {
|
for (int i = 0; i < nCols; ++i) {
|
||||||
int16_t cid = pTSchema->columns[i].colId;
|
int16_t cid = pTSchema->columns[i].colId;
|
||||||
int8_t col_type = pTSchema->columns[i].type;
|
int8_t col_type = pTSchema->columns[i].type;
|
||||||
|
|
||||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0);
|
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pTSchema);
|
taosMemoryFree(pTSchema);
|
||||||
|
@ -698,13 +823,17 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
|
||||||
for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
|
for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
|
||||||
int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
|
int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
|
||||||
|
|
||||||
|
bool hasPrimayKey = false;
|
||||||
int nCols = pTSchema->numOfCols;
|
int nCols = pTSchema->numOfCols;
|
||||||
|
if (nCols >= 2) {
|
||||||
|
hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < nCols; ++i) {
|
for (int i = 0; i < nCols; ++i) {
|
||||||
int16_t cid = pTSchema->columns[i].colId;
|
int16_t cid = pTSchema->columns[i].colId;
|
||||||
int8_t col_type = pTSchema->columns[i].type;
|
int8_t col_type = pTSchema->columns[i].type;
|
||||||
|
|
||||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0);
|
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -732,15 +861,14 @@ int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
|
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
(void)tsdbCacheCommitNoLock(pTsdb);
|
(void)tsdbCacheCommitNoLock(pTsdb);
|
||||||
|
|
||||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0);
|
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
|
|
||||||
|
|
||||||
rocksMayWrite(pTsdb, true, false, true);
|
rocksMayWrite(pTsdb, true, false, true);
|
||||||
|
|
||||||
|
@ -768,7 +896,7 @@ int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
|
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||||
|
@ -778,8 +906,7 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_
|
||||||
for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
|
for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
|
||||||
int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
|
int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
|
||||||
|
|
||||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0);
|
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
rocksMayWrite(pTsdb, true, false, true);
|
rocksMayWrite(pTsdb, true, false, true);
|
||||||
|
@ -794,6 +921,58 @@ typedef struct {
|
||||||
SLastKey key;
|
SLastKey key;
|
||||||
} SIdxKey;
|
} SIdxKey;
|
||||||
|
|
||||||
|
static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal *pColVal) {
|
||||||
|
uint8_t *pVal = NULL;
|
||||||
|
int nData = 0;
|
||||||
|
|
||||||
|
// update rowkey
|
||||||
|
pLastCol->rowKey.ts = pRowKey->ts;
|
||||||
|
pLastCol->rowKey.numOfPKs = pRowKey->numOfPKs;
|
||||||
|
for (int8_t i = 0; i < pRowKey->numOfPKs; i++) {
|
||||||
|
SValue *pPKValue = &pLastCol->rowKey.pks[i];
|
||||||
|
SValue *pNewPKValue = &pRowKey->pks[i];
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(pPKValue->type)) {
|
||||||
|
pVal = pPKValue->pData;
|
||||||
|
nData = pPKValue->nData;
|
||||||
|
}
|
||||||
|
*pPKValue = *pNewPKValue;
|
||||||
|
if (IS_VAR_DATA_TYPE(pPKValue->type)) {
|
||||||
|
if (nData < pPKValue->nData) {
|
||||||
|
taosMemoryFree(pVal);
|
||||||
|
pPKValue->pData = taosMemoryCalloc(1, pNewPKValue->nData);
|
||||||
|
} else {
|
||||||
|
pPKValue->pData = pVal;
|
||||||
|
}
|
||||||
|
if (pNewPKValue->nData) {
|
||||||
|
memcpy(pPKValue->pData, pNewPKValue->pData, pNewPKValue->nData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// update colval
|
||||||
|
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
||||||
|
nData = pLastCol->colVal.value.nData;
|
||||||
|
pVal = pLastCol->colVal.value.pData;
|
||||||
|
}
|
||||||
|
pLastCol->colVal = *pColVal;
|
||||||
|
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
||||||
|
if (nData < pColVal->value.nData) {
|
||||||
|
taosMemoryFree(pVal);
|
||||||
|
pLastCol->colVal.value.pData = taosMemoryCalloc(1, pColVal->value.nData);
|
||||||
|
} else {
|
||||||
|
pLastCol->colVal.value.pData = pVal;
|
||||||
|
}
|
||||||
|
if (pColVal->value.nData) {
|
||||||
|
memcpy(pLastCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pLastCol->dirty) {
|
||||||
|
pLastCol->dirty = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) {
|
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -821,46 +1000,28 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
||||||
|
|
||||||
// 3, build keys & multi get from rocks
|
// 3, build keys & multi get from rocks
|
||||||
int num_keys = TARRAY_SIZE(aColVal);
|
int num_keys = TARRAY_SIZE(aColVal);
|
||||||
TSKEY keyTs = TSDBROW_TS(pRow);
|
|
||||||
SArray *remainCols = NULL;
|
SArray *remainCols = NULL;
|
||||||
SLRUCache *pCache = pTsdb->lruCache;
|
SLRUCache *pCache = pTsdb->lruCache;
|
||||||
|
|
||||||
|
STsdbRowKey tsdbRowKey = {0};
|
||||||
|
tsdbRowGetKey(pRow, &tsdbRowKey);
|
||||||
|
SRowKey *pRowKey = &tsdbRowKey.key;
|
||||||
|
int8_t lflag = (pRowKey->numOfPKs != 0) ? LFLAG_PRIMARY_KEY : 0;
|
||||||
|
|
||||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||||
for (int i = 0; i < num_keys; ++i) {
|
for (int i = 0; i < num_keys; ++i) {
|
||||||
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
|
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
|
||||||
int16_t cid = pColVal->cid;
|
int16_t cid = pColVal->cid;
|
||||||
|
|
||||||
SLastKey *key = &(SLastKey){.ltype = 0, .uid = uid, .cid = cid};
|
SLastKey *key = &(SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid};
|
||||||
size_t klen = ROCKS_KEY_LEN;
|
size_t klen = ROCKS_KEY_LEN;
|
||||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
|
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
|
||||||
if (h) {
|
if (h) {
|
||||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
||||||
|
|
||||||
if (pLastCol->ts <= keyTs) {
|
if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) {
|
||||||
uint8_t *pVal = NULL;
|
tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal);
|
||||||
int nData = pLastCol->colVal.value.nData;
|
|
||||||
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
|
||||||
pVal = pLastCol->colVal.value.pData;
|
|
||||||
}
|
}
|
||||||
pLastCol->ts = keyTs;
|
|
||||||
pLastCol->colVal = *pColVal;
|
|
||||||
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
|
||||||
if (nData < pColVal->value.nData) {
|
|
||||||
taosMemoryFree(pVal);
|
|
||||||
pLastCol->colVal.value.pData = taosMemoryCalloc(1, pColVal->value.nData);
|
|
||||||
} else {
|
|
||||||
pLastCol->colVal.value.pData = pVal;
|
|
||||||
}
|
|
||||||
if (pColVal->value.nData) {
|
|
||||||
memcpy(pLastCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!pLastCol->dirty) {
|
|
||||||
pLastCol->dirty = 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosLRUCacheRelease(pCache, h, false);
|
taosLRUCacheRelease(pCache, h, false);
|
||||||
} else {
|
} else {
|
||||||
if (!remainCols) {
|
if (!remainCols) {
|
||||||
|
@ -870,36 +1031,14 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
||||||
}
|
}
|
||||||
|
|
||||||
if (COL_VAL_IS_VALUE(pColVal)) {
|
if (COL_VAL_IS_VALUE(pColVal)) {
|
||||||
key->ltype = 1;
|
key->lflag = lflag | LFLAG_LAST;
|
||||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
|
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
|
||||||
if (h) {
|
if (h) {
|
||||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
||||||
|
|
||||||
if (pLastCol->ts <= keyTs) {
|
if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) {
|
||||||
uint8_t *pVal = NULL;
|
tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal);
|
||||||
int nData = pLastCol->colVal.value.nData;
|
|
||||||
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
|
||||||
pVal = pLastCol->colVal.value.pData;
|
|
||||||
}
|
}
|
||||||
pLastCol->ts = keyTs;
|
|
||||||
pLastCol->colVal = *pColVal;
|
|
||||||
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
|
||||||
if (nData < pColVal->value.nData) {
|
|
||||||
taosMemoryFree(pVal);
|
|
||||||
pLastCol->colVal.value.pData = taosMemoryCalloc(1, pColVal->value.nData);
|
|
||||||
} else {
|
|
||||||
pLastCol->colVal.value.pData = pVal;
|
|
||||||
}
|
|
||||||
if (pColVal->value.nData) {
|
|
||||||
memcpy(pLastCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!pLastCol->dirty) {
|
|
||||||
pLastCol->dirty = 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosLRUCacheRelease(pCache, h, false);
|
taosLRUCacheRelease(pCache, h, false);
|
||||||
} else {
|
} else {
|
||||||
if (!remainCols) {
|
if (!remainCols) {
|
||||||
|
@ -943,11 +1082,11 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
||||||
|
|
||||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
||||||
|
|
||||||
if (idxKey->key.ltype == 0) {
|
if (IS_LAST_ROW_KEY(idxKey->key)) {
|
||||||
if (NULL == pLastCol || pLastCol->ts <= keyTs) {
|
if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) {
|
||||||
char *value = NULL;
|
char *value = NULL;
|
||||||
size_t vlen = 0;
|
size_t vlen = 0;
|
||||||
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
|
tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen);
|
||||||
// SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid};
|
// SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid};
|
||||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||||
|
|
||||||
|
@ -976,10 +1115,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (COL_VAL_IS_VALUE(pColVal)) {
|
if (COL_VAL_IS_VALUE(pColVal)) {
|
||||||
if (NULL == pLastCol || pLastCol->ts <= keyTs) {
|
if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) {
|
||||||
char *value = NULL;
|
char *value = NULL;
|
||||||
size_t vlen = 0;
|
size_t vlen = 0;
|
||||||
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
|
tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen);
|
||||||
// SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid};
|
// SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid};
|
||||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||||
|
|
||||||
|
@ -1007,6 +1146,8 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
||||||
taosMemoryFree(value);
|
taosMemoryFree(value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pLastCol);
|
||||||
}
|
}
|
||||||
|
|
||||||
rocksdb_free(values_list[i]);
|
rocksdb_free(values_list[i]);
|
||||||
|
@ -1409,6 +1550,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
||||||
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
||||||
taosArrayRemove(remainCols, j);
|
taosArrayRemove(remainCols, j);
|
||||||
|
|
||||||
|
taosMemoryFree(pLastCol);
|
||||||
taosMemoryFree(values_list[i]);
|
taosMemoryFree(values_list[i]);
|
||||||
} else {
|
} else {
|
||||||
++j;
|
++j;
|
||||||
|
@ -1517,12 +1659,18 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
||||||
char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
||||||
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
|
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
|
||||||
const size_t klen = ROCKS_KEY_LEN;
|
const size_t klen = ROCKS_KEY_LEN;
|
||||||
|
|
||||||
|
int8_t lflag = 0;
|
||||||
|
if (num_keys >= 2) {
|
||||||
|
lflag = (pTSchema->columns[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0;
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < num_keys; ++i) {
|
for (int i = 0; i < num_keys; ++i) {
|
||||||
int16_t cid = pTSchema->columns[i].colId;
|
int16_t cid = pTSchema->columns[i].colId;
|
||||||
|
|
||||||
char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
|
char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
|
||||||
((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid};
|
((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST, .uid = uid, .cid = cid};
|
||||||
((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid};
|
((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid};
|
||||||
|
|
||||||
keys_list[i] = keys;
|
keys_list[i] = keys;
|
||||||
keys_list[num_keys + i] = keys + sizeof(SLastKey);
|
keys_list[num_keys + i] = keys + sizeof(SLastKey);
|
||||||
|
@ -1554,15 +1702,16 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
||||||
for (int i = 0; i < num_keys; ++i) {
|
for (int i = 0; i < num_keys; ++i) {
|
||||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
||||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||||
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
|
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
|
||||||
rocksdb_writebatch_delete(wb, keys_list[i], klen);
|
rocksdb_writebatch_delete(wb, keys_list[i], klen);
|
||||||
}
|
}
|
||||||
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
|
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
|
||||||
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
|
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
|
||||||
rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen);
|
rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen);
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
||||||
|
|
||||||
|
taosMemoryFree(pLastCol);
|
||||||
rocksdb_free(values_list[i]);
|
rocksdb_free(values_list[i]);
|
||||||
rocksdb_free(values_list[i + num_keys]);
|
rocksdb_free(values_list[i + num_keys]);
|
||||||
|
|
||||||
|
@ -1575,7 +1724,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
||||||
if (pLastCol->dirty) {
|
if (pLastCol->dirty) {
|
||||||
pLastCol->dirty = 0;
|
pLastCol->dirty = 0;
|
||||||
}
|
}
|
||||||
if (pLastCol->ts <= eKey && pLastCol->ts >= sKey) {
|
if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
|
||||||
erase = true;
|
erase = true;
|
||||||
}
|
}
|
||||||
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
|
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
|
||||||
|
@ -1591,7 +1740,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
||||||
if (pLastCol->dirty) {
|
if (pLastCol->dirty) {
|
||||||
pLastCol->dirty = 0;
|
pLastCol->dirty = 0;
|
||||||
}
|
}
|
||||||
if (pLastCol->ts <= eKey && pLastCol->ts >= sKey) {
|
if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
|
||||||
erase = true;
|
erase = true;
|
||||||
}
|
}
|
||||||
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
|
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
|
||||||
|
|
|
@ -194,18 +194,8 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
||||||
pMemTable->nDel++;
|
pMemTable->nDel++;
|
||||||
pMemTable->minVer = TMIN(pMemTable->minVer, version);
|
pMemTable->minVer = TMIN(pMemTable->minVer, version);
|
||||||
pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
|
pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
|
||||||
/*
|
|
||||||
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
|
|
||||||
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
|
|
||||||
tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey);
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
// if (eKey >= pTbData->maxKey && sKey <= pTbData->maxKey) {
|
|
||||||
tsdbCacheDel(pTsdb, suid, uid, sKey, eKey);
|
tsdbCacheDel(pTsdb, suid, uid, sKey, eKey);
|
||||||
//}
|
|
||||||
|
|
||||||
tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
||||||
" at version %" PRId64,
|
" at version %" PRId64,
|
||||||
|
|
Loading…
Reference in New Issue