fix: refactor lastcache ser/des for compatibility
This commit is contained in:
parent
488486b172
commit
7ae4415890
|
@ -35,7 +35,6 @@ extern "C" {
|
|||
#define CACHESCAN_RETRIEVE_TYPE_SINGLE 0x2
|
||||
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
|
||||
#define CACHESCAN_RETRIEVE_LAST 0x8
|
||||
#define CACHESCAN_RETRIEVE_PK 0x10
|
||||
|
||||
#define META_READER_LOCK 0x0
|
||||
#define META_READER_NOLOCK 0x1
|
||||
|
|
|
@ -894,13 +894,16 @@ typedef enum {
|
|||
READER_EXEC_ROWS = 0x2,
|
||||
} EExecMode;
|
||||
|
||||
#define LAST_COL_VERSION (0x1)
|
||||
|
||||
typedef struct {
|
||||
SRowKey rowKey;
|
||||
int8_t dirty;
|
||||
SColVal colVal;
|
||||
int8_t version;
|
||||
} SLastCol;
|
||||
|
||||
typedef struct {
|
||||
typedef struct {
|
||||
union {
|
||||
int64_t val;
|
||||
struct {
|
||||
|
|
|
@ -130,20 +130,14 @@ static void tsdbClosePgCache(STsdb *pTsdb) {
|
|||
enum {
|
||||
LFLAG_LAST_ROW = 0,
|
||||
LFLAG_LAST = 1,
|
||||
LFLAG_VERSION = 1 << 2,
|
||||
LFLAG_VERSION_BITS = (1 << 2 | 1 << 3),
|
||||
LFLAG_PRIMARY_KEY = CACHESCAN_RETRIEVE_PK,
|
||||
};
|
||||
|
||||
#define LAST_KEY_HAS_VERSION ((k).lflag & LFLAG_VERSION_BITS)
|
||||
|
||||
typedef struct {
|
||||
tb_uid_t uid;
|
||||
int16_t cid;
|
||||
int8_t lflag;
|
||||
} SLastKey;
|
||||
|
||||
#define HAS_PRIMARY_KEY(k) (((k).lflag & LFLAG_PRIMARY_KEY) == LFLAG_PRIMARY_KEY)
|
||||
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
|
||||
#define IS_LAST_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
|
||||
|
||||
|
@ -336,93 +330,100 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
|
|||
}
|
||||
}
|
||||
|
||||
// note: new object do not own colVal's resource, just copy the pointer
|
||||
static SLastCol *tsdbCacheConvertLastColV0(SLastColV0 *pLastColV0) {
|
||||
SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
if (pLastCol == NULL) return NULL;
|
||||
pLastCol->rowKey.ts = pLastColV0->ts;
|
||||
pLastCol->rowKey.numOfPKs = 0;
|
||||
pLastCol->dirty = pLastColV0->dirty;
|
||||
pLastCol->colVal.cid = pLastColV0->colVal.cid;
|
||||
pLastCol->colVal.flag = pLastColV0->colVal.flag;
|
||||
pLastCol->colVal.value.type = pLastColV0->colVal.type;
|
||||
pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
|
||||
static SLastColV0 *tsdbCacheDeserializeV0(char const *value, size_t *inOutOffset) {
|
||||
SLastColV0 *pLastColV0 = NULL;
|
||||
size_t localOffset = 0;
|
||||
|
||||
return pLastCol;
|
||||
if (!value) {
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
pLastColV0 = taosMemoryMalloc(sizeof(SLastColV0));
|
||||
*pLastColV0 = *(SLastColV0 *)(value);
|
||||
|
||||
localOffset = sizeof(*pLastColV0);
|
||||
|
||||
SColValV0 *pColValV0 = &pLastColV0->colVal;
|
||||
if (IS_VAR_DATA_TYPE(pColValV0->type)) {
|
||||
if (pColValV0->value.nData > 0) {
|
||||
pColValV0->value.pData = (char *)value + localOffset;
|
||||
localOffset += pColValV0->value.nData;
|
||||
} else {
|
||||
pColValV0->value.pData = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
_OUT:
|
||||
*inOutOffset += localOffset;
|
||||
return pLastColV0;
|
||||
}
|
||||
|
||||
static SLastCol *tsdbCacheDeserializeV0(char const *value) {
|
||||
static SLastCol *tsdbCacheDeserialize(char const *value, size_t size) {
|
||||
if (!value) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SLastColV0 *pLastColV0 = (SLastColV0 *)value;
|
||||
SColValV0 *pColVal = &pLastColV0->colVal;
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||
if (pColVal->value.nData > 0) {
|
||||
pColVal->value.pData = (char *)value + sizeof(*pLastColV0);
|
||||
} else {
|
||||
pColVal->value.pData = NULL;
|
||||
}
|
||||
}
|
||||
size_t offset = 0;
|
||||
|
||||
return tsdbCacheConvertLastColV0(pLastColV0);
|
||||
}
|
||||
|
||||
static SLastCol *tsdbCacheDeserializeV1(char const *value) {
|
||||
if (!value) {
|
||||
SLastColV0 *pLastColV0 = tsdbCacheDeserializeV0(value, &offset);
|
||||
if (NULL == pLastColV0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SLastCol *pLastCol = taosMemoryMalloc(sizeof(SLastCol));
|
||||
*pLastCol = *(SLastCol *)(value);
|
||||
pLastCol->rowKey.ts = pLastColV0->ts;
|
||||
pLastCol->dirty = pLastColV0->dirty;
|
||||
pLastCol->colVal.cid = pLastColV0->colVal.cid;
|
||||
pLastCol->colVal.flag = pLastColV0->colVal.flag;
|
||||
pLastCol->colVal.value.type = pLastColV0->colVal.type;
|
||||
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
|
||||
pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
|
||||
pLastCol->colVal.value.pData = pLastColV0->colVal.value.pData;
|
||||
} else {
|
||||
pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pLastColV0);
|
||||
|
||||
if (offset == size) {
|
||||
// version 0
|
||||
pLastCol->version = LAST_COL_VERSION;
|
||||
pLastCol->rowKey.numOfPKs = 0;
|
||||
memset(pLastCol->rowKey.pks, 0, sizeof(pLastCol->rowKey.pks));
|
||||
return pLastCol;
|
||||
}
|
||||
|
||||
pLastCol->version = *(int8_t *)(value + offset);
|
||||
offset += sizeof(int8_t);
|
||||
|
||||
pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset);
|
||||
offset += sizeof(uint8_t);
|
||||
|
||||
char *currentPos = (char *)value + sizeof(*pLastCol);
|
||||
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
||||
SValue *pValue = &pLastCol->rowKey.pks[i];
|
||||
*pValue = *(SValue *)(value + offset);
|
||||
offset += sizeof(SValue);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pValue->type)) {
|
||||
if (pValue->nData > 0) {
|
||||
pValue->pData = currentPos;
|
||||
currentPos += pValue->nData;
|
||||
pValue->pData = (char *)value + offset;
|
||||
offset += pValue->nData;
|
||||
} else {
|
||||
pValue->pData = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SColVal *pColVal = &pLastCol->colVal;
|
||||
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
||||
if (pColVal->value.nData > 0) {
|
||||
pColVal->value.pData = currentPos;
|
||||
currentPos += pColVal->value.nData;
|
||||
} else {
|
||||
pColVal->value.pData = NULL;
|
||||
pValue->val = *(int64_t *)(value + offset);
|
||||
offset += sizeof(int64_t);
|
||||
}
|
||||
}
|
||||
|
||||
return pLastCol;
|
||||
}
|
||||
|
||||
static SLastCol *tsdbCacheDeserialize(char const *value, int8_t lflag) {
|
||||
if (!value) {
|
||||
if (size < offset) {
|
||||
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int8_t version = lflag & LFLAG_VERSION_BITS;
|
||||
|
||||
SLastCol *lastCol = NULL;
|
||||
switch (version) {
|
||||
case 0:
|
||||
lastCol = tsdbCacheDeserializeV0(value);
|
||||
break;
|
||||
case LFLAG_VERSION:
|
||||
lastCol = tsdbCacheDeserializeV1(value);
|
||||
break;
|
||||
defalut:
|
||||
tsdbError("invalid last key version %" PRId8 " , lflag:%" PRId8, version, lflag);
|
||||
break;
|
||||
}
|
||||
return lastCol;
|
||||
return pLastCol;
|
||||
}
|
||||
|
||||
static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) {
|
||||
|
@ -435,42 +436,82 @@ static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) {
|
|||
return from->nData;
|
||||
}
|
||||
|
||||
static uint32_t tsdbCacheCopyVarDataToV0(SValue *from, SValueV0 *to) {
|
||||
ASSERT(from->nData >= 0);
|
||||
if (from->nData > 0) {
|
||||
memcpy(to->pData, from->pData, from->nData);
|
||||
}
|
||||
to->nData = from->nData;
|
||||
return from->nData;
|
||||
}
|
||||
|
||||
/*
|
||||
typedef struct {
|
||||
SLastColV0 lastColV0;
|
||||
char colData[];
|
||||
int8_t version;
|
||||
uint8_t numOfPKs;
|
||||
SValue pks[0];
|
||||
char pk0Data[];
|
||||
SValue pks[1];
|
||||
char pk1Data[];
|
||||
...
|
||||
} SLastColDisk;
|
||||
*/
|
||||
static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
|
||||
SColVal *pColVal = &pLastCol->colVal;
|
||||
size_t length = sizeof(*pLastCol);
|
||||
size_t length = sizeof(SLastColV0);
|
||||
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
||||
length += pColVal->value.nData;
|
||||
}
|
||||
|
||||
uint8_t numOfPKs = pLastCol->rowKey.numOfPKs;
|
||||
|
||||
length += sizeof(int8_t) + sizeof(uint8_t) + (sizeof(SValue) * numOfPKs); // version + numOfPKs + pks
|
||||
|
||||
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
||||
if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
|
||||
length += pLastCol->rowKey.pks[i].nData;
|
||||
}
|
||||
}
|
||||
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
||||
length += pColVal->value.nData;
|
||||
}
|
||||
|
||||
*value = taosMemoryMalloc(length);
|
||||
|
||||
// copy last col
|
||||
SLastCol *pToLastCol = (SLastCol *)(*value);
|
||||
*pToLastCol = *pLastCol;
|
||||
char *currentPos = *value + sizeof(*pLastCol);
|
||||
SLastColV0 *pToLastColV0 = (SLastColV0 *)(*value);
|
||||
pToLastColV0->ts = pLastCol->rowKey.ts;
|
||||
pToLastColV0->dirty = pLastCol->dirty;
|
||||
pToLastColV0->colVal.cid = pColVal->cid;
|
||||
pToLastColV0->colVal.flag = pColVal->flag;
|
||||
pToLastColV0->colVal.type = pColVal->value.type;
|
||||
|
||||
// copy var data pks
|
||||
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
||||
SValue *pFromValue = &pLastCol->rowKey.pks[i];
|
||||
if (IS_VAR_DATA_TYPE(pFromValue->type)) {
|
||||
SValue *pToValue = &pToLastCol->rowKey.pks[i];
|
||||
pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos;
|
||||
currentPos += tsdbCacheCopyVarData(pFromValue, pToValue);
|
||||
}
|
||||
}
|
||||
char *currentPos = *value + sizeof(*pToLastColV0);
|
||||
|
||||
// copy var data value
|
||||
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
||||
SValue *pFromValue = &pColVal->value;
|
||||
SValue *pToValue = &pToLastCol->colVal.value;
|
||||
SValueV0 *pToValue = &pToLastColV0->colVal.value;
|
||||
pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos;
|
||||
currentPos += tsdbCacheCopyVarData(pFromValue, pToValue);
|
||||
currentPos += tsdbCacheCopyVarDataToV0(pFromValue, pToValue);
|
||||
} else {
|
||||
pToLastColV0->colVal.value.val = pColVal->value.val;
|
||||
}
|
||||
|
||||
*(int8_t *)currentPos = LAST_COL_VERSION;
|
||||
currentPos += sizeof(int8_t);
|
||||
|
||||
// copy var data pks
|
||||
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
||||
SValue *pFromValue = &pLastCol->rowKey.pks[i];
|
||||
SValue *pToValue = (SValue *)currentPos;
|
||||
*pToValue = *pFromValue;
|
||||
currentPos += sizeof(SValue);
|
||||
if (IS_VAR_DATA_TYPE(pFromValue->type)) {
|
||||
pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos;
|
||||
currentPos += tsdbCacheCopyVarData(pFromValue, pToValue);
|
||||
}
|
||||
}
|
||||
|
||||
*size = length;
|
||||
}
|
||||
|
||||
|
@ -603,7 +644,7 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
|
|||
charge += pLastCol->colVal.value.nData;
|
||||
}
|
||||
|
||||
SLastKey *pLastKey = &(SLastKey){.lflag = lflag | LFLAG_VERSION, .uid = uid, .cid = cid};
|
||||
SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
|
||||
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
|
||||
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
|
||||
if (status != TAOS_LRU_STATUS_OK) {
|
||||
|
@ -653,11 +694,9 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
|
|||
size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
|
||||
const size_t klen = ROCKS_KEY_LEN;
|
||||
|
||||
int8_t lflag = hasPrimaryKey ? LFLAG_PRIMARY_KEY : 0;
|
||||
|
||||
char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
|
||||
((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST | LFLAG_VERSION, .uid = uid, .cid = cid};
|
||||
((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW | LFLAG_VERSION, .uid = uid, .cid = cid};
|
||||
((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid};
|
||||
((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
|
||||
|
||||
keys_list[0] = keys;
|
||||
keys_list[1] = keys + sizeof(SLastKey);
|
||||
|
@ -681,13 +720,13 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
|
|||
|
||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||
{
|
||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[0], ((SLastKey*)keys_list[0])->lflag);
|
||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[0], values_list_sizes[0]);
|
||||
if (NULL != pLastCol) {
|
||||
rocksdb_writebatch_delete(wb, keys_list[0], klen);
|
||||
}
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
|
||||
pLastCol = tsdbCacheDeserialize(values_list[1], ((SLastKey*)keys_list[1])->lflag);
|
||||
pLastCol = tsdbCacheDeserialize(values_list[1], values_list_sizes[1]);
|
||||
if (NULL != pLastCol) {
|
||||
rocksdb_writebatch_delete(wb, keys_list[1], klen);
|
||||
}
|
||||
|
@ -733,18 +772,12 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap
|
|||
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
if (suid < 0) {
|
||||
int8_t lflag = 0;
|
||||
int nCols = pSchemaRow->nCols;
|
||||
if (nCols >= 2) {
|
||||
lflag = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0;
|
||||
}
|
||||
|
||||
for (int i = 0; i < nCols; ++i) {
|
||||
for (int i = 0; i < pSchemaRow->nCols; ++i) {
|
||||
int16_t cid = pSchemaRow->pSchema[i].colId;
|
||||
int8_t col_type = pSchemaRow->pSchema[i].type;
|
||||
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST_ROW);
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST);
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
|
||||
}
|
||||
} else {
|
||||
STSchema *pTSchema = NULL;
|
||||
|
@ -754,18 +787,12 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap
|
|||
return -1;
|
||||
}
|
||||
|
||||
int8_t lflag = 0;
|
||||
int nCols = pTSchema->numOfCols;
|
||||
if (nCols >= 2) {
|
||||
lflag = (pTSchema->columns[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0;
|
||||
}
|
||||
|
||||
for (int i = 0; i < nCols; ++i) {
|
||||
for (int i = 0; i < pTSchema->numOfCols; ++i) {
|
||||
int16_t cid = pTSchema->columns[i].colId;
|
||||
int8_t col_type = pTSchema->columns[i].type;
|
||||
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST_ROW);
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST);
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
|
||||
}
|
||||
|
||||
taosMemoryFree(pTSchema);
|
||||
|
@ -1024,14 +1051,13 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
|||
STsdbRowKey tsdbRowKey = {0};
|
||||
tsdbRowGetKey(pRow, &tsdbRowKey);
|
||||
SRowKey *pRowKey = &tsdbRowKey.key;
|
||||
int8_t lflag = (pRowKey->numOfPKs != 0) ? LFLAG_PRIMARY_KEY : 0;
|
||||
|
||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
|
||||
int16_t cid = pColVal->cid;
|
||||
|
||||
SLastKey *key = &(SLastKey){.lflag = lflag | LFLAG_LAST_ROW | LFLAG_VERSION, .uid = uid, .cid = cid};
|
||||
SLastKey *key = &(SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
|
||||
size_t klen = ROCKS_KEY_LEN;
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
|
||||
if (h) {
|
||||
|
@ -1049,7 +1075,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
|||
}
|
||||
|
||||
if (COL_VAL_IS_VALUE(pColVal)) {
|
||||
key->lflag = lflag | LFLAG_LAST | LFLAG_VERSION;
|
||||
key->lflag = LFLAG_LAST;
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
|
||||
if (h) {
|
||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
||||
|
@ -1094,7 +1120,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
|||
SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx;
|
||||
// SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx);
|
||||
|
||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], ((SLastKey*)keys_list[i])->lflag);
|
||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]);
|
||||
SLastCol *PToFree = pLastCol;
|
||||
|
||||
if (IS_LAST_ROW_KEY(idxKey->key)) {
|
||||
|
@ -1106,8 +1132,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
|||
if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
|
||||
char *value = NULL;
|
||||
size_t vlen = 0;
|
||||
tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value,
|
||||
&vlen);
|
||||
tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen);
|
||||
|
||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||
|
||||
|
@ -1401,7 +1426,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
|
||||
SIdxKey *idxKey = taosArrayGet(remainCols, 0);
|
||||
if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
SLastKey *key = &(SLastKey){.lflag = ltype | LFLAG_VERSION, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
|
||||
SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
|
||||
|
||||
taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key});
|
||||
}
|
||||
|
@ -1570,7 +1595,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
|||
|
||||
SLRUCache *pCache = pTsdb->lruCache;
|
||||
for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
|
||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], ((SLastKey*)keys_list[i])->lflag);
|
||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]);
|
||||
SLastCol *PToFree = pLastCol;
|
||||
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
|
||||
if (pLastCol) {
|
||||
|
@ -1637,7 +1662,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
|||
for (int i = 0; i < num_keys; ++i) {
|
||||
int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
|
||||
|
||||
SLastKey key = {.lflag = ltype | LFLAG_VERSION, .uid = uid, .cid = cid};
|
||||
SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid};
|
||||
// for select last_row, last case
|
||||
int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
|
||||
if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
|
||||
|
@ -1726,17 +1751,12 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
|||
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
|
||||
const size_t klen = ROCKS_KEY_LEN;
|
||||
|
||||
int8_t lflag = 0;
|
||||
if (num_keys >= 2) {
|
||||
lflag = (pTSchema->columns[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0;
|
||||
}
|
||||
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
int16_t cid = pTSchema->columns[i].colId;
|
||||
|
||||
char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
|
||||
((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST | LFLAG_VERSION, .uid = uid, .cid = cid};
|
||||
((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW | LFLAG_VERSION, .uid = uid, .cid = cid};
|
||||
((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid};
|
||||
((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
|
||||
|
||||
keys_list[i] = keys;
|
||||
keys_list[num_keys + i] = keys + sizeof(SLastKey);
|
||||
|
@ -1766,14 +1786,14 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
|||
|
||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], ((SLastKey *)keys_list[i])->lflag);
|
||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]);
|
||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
|
||||
rocksdb_writebatch_delete(wb, keys_list[i], klen);
|
||||
}
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
|
||||
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys], ((SLastKey *)keys_list[i + num_keys])->lflag);
|
||||
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys], values_list_sizes[i + num_keys]);
|
||||
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
|
||||
rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen);
|
||||
}
|
||||
|
|
|
@ -385,9 +385,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
}
|
||||
|
||||
int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3;
|
||||
if (pr->rowKey.numOfPKs > 0) {
|
||||
ltype |= CACHESCAN_RETRIEVE_PK;
|
||||
}
|
||||
|
||||
STableKeyInfo* pTableList = pr->pTableList;
|
||||
|
||||
|
|
|
@ -160,9 +160,6 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
|||
// partition by tbname
|
||||
if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) {
|
||||
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull);
|
||||
if (pInfo->numOfPks > 0) {
|
||||
pInfo->retrieveType |= CACHESCAN_RETRIEVE_PK;
|
||||
}
|
||||
|
||||
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
|
||||
|
||||
|
|
Loading…
Reference in New Issue