fix: adjust SValueV0 data order
This commit is contained in:
parent
7ae4415890
commit
457b98fee8
|
@ -900,32 +900,8 @@ typedef struct {
|
|||
SRowKey rowKey;
|
||||
int8_t dirty;
|
||||
SColVal colVal;
|
||||
int8_t version;
|
||||
} SLastCol;
|
||||
|
||||
typedef struct {
|
||||
union {
|
||||
int64_t val;
|
||||
struct {
|
||||
uint8_t *pData;
|
||||
uint32_t nData;
|
||||
};
|
||||
};
|
||||
} SValueV0;
|
||||
|
||||
typedef struct {
|
||||
int16_t cid;
|
||||
int8_t type;
|
||||
int8_t flag;
|
||||
SValueV0 value;
|
||||
} SColValV0;
|
||||
|
||||
typedef struct {
|
||||
TSKEY ts;
|
||||
int8_t dirty;
|
||||
SColValV0 colVal;
|
||||
} SLastColV0;
|
||||
|
||||
int32_t tsdbOpenCache(STsdb *pTsdb);
|
||||
void tsdbCloseCache(STsdb *pTsdb);
|
||||
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);
|
||||
|
|
|
@ -330,32 +330,41 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
|
|||
}
|
||||
}
|
||||
|
||||
static SLastColV0 *tsdbCacheDeserializeV0(char const *value, size_t *inOutOffset) {
|
||||
SLastColV0 *pLastColV0 = NULL;
|
||||
size_t localOffset = 0;
|
||||
typedef struct {
|
||||
TSKEY ts;
|
||||
int8_t dirty;
|
||||
struct {
|
||||
int16_t cid;
|
||||
int8_t type;
|
||||
int8_t flag;
|
||||
union {
|
||||
int64_t val;
|
||||
struct {
|
||||
uint32_t nData;
|
||||
uint8_t *pData;
|
||||
};
|
||||
} value;
|
||||
} colVal;
|
||||
} SLastColV0;
|
||||
|
||||
if (!value) {
|
||||
goto _OUT;
|
||||
static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) {
|
||||
SLastColV0 *pLastColV0 = (SLastColV0 *)value;
|
||||
|
||||
pLastCol->rowKey.ts = pLastColV0->ts;
|
||||
pLastCol->rowKey.numOfPKs = 0;
|
||||
pLastCol->dirty = pLastColV0->dirty;
|
||||
pLastCol->colVal.cid = pLastColV0->colVal.cid;
|
||||
pLastCol->colVal.flag = pLastColV0->colVal.flag;
|
||||
pLastCol->colVal.value.type = pLastColV0->colVal.type;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
|
||||
pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
|
||||
pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
|
||||
return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
|
||||
} else {
|
||||
pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
|
||||
return sizeof(SLastColV0);
|
||||
}
|
||||
|
||||
pLastColV0 = taosMemoryMalloc(sizeof(SLastColV0));
|
||||
*pLastColV0 = *(SLastColV0 *)(value);
|
||||
|
||||
localOffset = sizeof(*pLastColV0);
|
||||
|
||||
SColValV0 *pColValV0 = &pLastColV0->colVal;
|
||||
if (IS_VAR_DATA_TYPE(pColValV0->type)) {
|
||||
if (pColValV0->value.nData > 0) {
|
||||
pColValV0->value.pData = (char *)value + localOffset;
|
||||
localOffset += pColValV0->value.nData;
|
||||
} else {
|
||||
pColValV0->value.pData = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
_OUT:
|
||||
*inOutOffset += localOffset;
|
||||
return pLastColV0;
|
||||
}
|
||||
|
||||
static SLastCol *tsdbCacheDeserialize(char const *value, size_t size) {
|
||||
|
@ -363,61 +372,41 @@ static SLastCol *tsdbCacheDeserialize(char const *value, size_t size) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
size_t offset = 0;
|
||||
|
||||
SLastColV0 *pLastColV0 = tsdbCacheDeserializeV0(value, &offset);
|
||||
if (NULL == pLastColV0) {
|
||||
SLastCol* pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
if (NULL == pLastCol) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SLastCol *pLastCol = taosMemoryMalloc(sizeof(SLastCol));
|
||||
pLastCol->rowKey.ts = pLastColV0->ts;
|
||||
pLastCol->dirty = pLastColV0->dirty;
|
||||
pLastCol->colVal.cid = pLastColV0->colVal.cid;
|
||||
pLastCol->colVal.flag = pLastColV0->colVal.flag;
|
||||
pLastCol->colVal.value.type = pLastColV0->colVal.type;
|
||||
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
|
||||
pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
|
||||
pLastCol->colVal.value.pData = pLastColV0->colVal.value.pData;
|
||||
} else {
|
||||
pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pLastColV0);
|
||||
|
||||
int32_t offset = tsdbCacheDeserializeV0(value, pLastCol);
|
||||
if (offset == size) {
|
||||
// version 0
|
||||
pLastCol->version = LAST_COL_VERSION;
|
||||
pLastCol->rowKey.numOfPKs = 0;
|
||||
memset(pLastCol->rowKey.pks, 0, sizeof(pLastCol->rowKey.pks));
|
||||
return pLastCol;
|
||||
} else if (offset > size) {
|
||||
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pLastCol->version = *(int8_t *)(value + offset);
|
||||
// version
|
||||
int8_t version = *(int8_t *)(value + offset);
|
||||
offset += sizeof(int8_t);
|
||||
|
||||
// numOfPKs
|
||||
pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset);
|
||||
offset += sizeof(uint8_t);
|
||||
|
||||
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
||||
SValue *pValue = &pLastCol->rowKey.pks[i];
|
||||
*pValue = *(SValue *)(value + offset);
|
||||
// pks
|
||||
for (int32_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
||||
pLastCol->rowKey.pks[i] = *(SValue *)(value + offset);
|
||||
offset += sizeof(SValue);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pValue->type)) {
|
||||
if (pValue->nData > 0) {
|
||||
pValue->pData = (char *)value + offset;
|
||||
offset += pValue->nData;
|
||||
} else {
|
||||
pValue->pData = NULL;
|
||||
}
|
||||
} else {
|
||||
pValue->val = *(int64_t *)(value + offset);
|
||||
offset += sizeof(int64_t);
|
||||
if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
|
||||
pLastCol->rowKey.pks[i].pData = (uint8_t *)value + offset;
|
||||
offset += pLastCol->rowKey.pks[i].nData;
|
||||
}
|
||||
}
|
||||
|
||||
if (size < offset) {
|
||||
if (offset > size) {
|
||||
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
return NULL;
|
||||
|
@ -426,25 +415,6 @@ static SLastCol *tsdbCacheDeserialize(char const *value, size_t size) {
|
|||
return pLastCol;
|
||||
}
|
||||
|
||||
static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) {
|
||||
ASSERT(from->nData >= 0);
|
||||
if (from->nData > 0) {
|
||||
memcpy(to->pData, from->pData, from->nData);
|
||||
}
|
||||
to->type = from->type;
|
||||
to->nData = from->nData;
|
||||
return from->nData;
|
||||
}
|
||||
|
||||
static uint32_t tsdbCacheCopyVarDataToV0(SValue *from, SValueV0 *to) {
|
||||
ASSERT(from->nData >= 0);
|
||||
if (from->nData > 0) {
|
||||
memcpy(to->pData, from->pData, from->nData);
|
||||
}
|
||||
to->nData = from->nData;
|
||||
return from->nData;
|
||||
}
|
||||
|
||||
/*
|
||||
typedef struct {
|
||||
SLastColV0 lastColV0;
|
||||
|
@ -458,61 +428,61 @@ typedef struct {
|
|||
...
|
||||
} SLastColDisk;
|
||||
*/
|
||||
static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
|
||||
SColVal *pColVal = &pLastCol->colVal;
|
||||
size_t length = sizeof(SLastColV0);
|
||||
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
||||
length += pColVal->value.nData;
|
||||
}
|
||||
static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) {
|
||||
SLastColV0 *pLastColV0 = (SLastColV0 *)value;
|
||||
|
||||
uint8_t numOfPKs = pLastCol->rowKey.numOfPKs;
|
||||
|
||||
length += sizeof(int8_t) + sizeof(uint8_t) + (sizeof(SValue) * numOfPKs); // version + numOfPKs + pks
|
||||
|
||||
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
||||
if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
|
||||
length += pLastCol->rowKey.pks[i].nData;
|
||||
}
|
||||
}
|
||||
|
||||
*value = taosMemoryMalloc(length);
|
||||
|
||||
// copy last col
|
||||
SLastColV0 *pToLastColV0 = (SLastColV0 *)(*value);
|
||||
pToLastColV0->ts = pLastCol->rowKey.ts;
|
||||
pToLastColV0->dirty = pLastCol->dirty;
|
||||
pToLastColV0->colVal.cid = pColVal->cid;
|
||||
pToLastColV0->colVal.flag = pColVal->flag;
|
||||
pToLastColV0->colVal.type = pColVal->value.type;
|
||||
|
||||
char *currentPos = *value + sizeof(*pToLastColV0);
|
||||
|
||||
// copy var data value
|
||||
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
|
||||
SValue *pFromValue = &pColVal->value;
|
||||
SValueV0 *pToValue = &pToLastColV0->colVal.value;
|
||||
pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos;
|
||||
currentPos += tsdbCacheCopyVarDataToV0(pFromValue, pToValue);
|
||||
pLastColV0->ts = pLastCol->rowKey.ts;
|
||||
pLastColV0->dirty = pLastCol->dirty;
|
||||
pLastColV0->colVal.cid = pLastCol->colVal.cid;
|
||||
pLastColV0->colVal.flag = pLastCol->colVal.flag;
|
||||
pLastColV0->colVal.type = pLastCol->colVal.value.type;
|
||||
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
|
||||
pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
|
||||
memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
|
||||
return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
|
||||
} else {
|
||||
pToLastColV0->colVal.value.val = pColVal->value.val;
|
||||
pLastColV0->colVal.value.val = pLastCol->colVal.value.val;
|
||||
return sizeof(SLastColV0);
|
||||
}
|
||||
|
||||
*(int8_t *)currentPos = LAST_COL_VERSION;
|
||||
currentPos += sizeof(int8_t);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
|
||||
*size = sizeof(SLastColV0);
|
||||
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
|
||||
*size += pLastCol->colVal.value.nData;
|
||||
}
|
||||
*size += sizeof(uint8_t) + sizeof(uint8_t); // version + numOfPKs
|
||||
|
||||
// copy var data pks
|
||||
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
||||
SValue *pFromValue = &pLastCol->rowKey.pks[i];
|
||||
SValue *pToValue = (SValue *)currentPos;
|
||||
*pToValue = *pFromValue;
|
||||
currentPos += sizeof(SValue);
|
||||
if (IS_VAR_DATA_TYPE(pFromValue->type)) {
|
||||
pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos;
|
||||
currentPos += tsdbCacheCopyVarData(pFromValue, pToValue);
|
||||
*size += sizeof(SValue);
|
||||
if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
|
||||
*size += pLastCol->rowKey.pks[i].nData;
|
||||
}
|
||||
}
|
||||
|
||||
*size = length;
|
||||
*value = taosMemoryMalloc(*size);
|
||||
|
||||
int32_t offset = tsdbCacheSerializeV0(*value, pLastCol);
|
||||
|
||||
// version
|
||||
((uint8_t *)(*value + offset))[0] = LAST_COL_VERSION;
|
||||
offset++;
|
||||
|
||||
// numOfPKs
|
||||
((uint8_t *)(*value + offset))[0] = pLastCol->rowKey.numOfPKs;
|
||||
offset++;
|
||||
|
||||
// pks
|
||||
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
||||
((SValue *)(*value + offset))[0] = pLastCol->rowKey.pks[i];
|
||||
offset += sizeof(SValue);
|
||||
if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
|
||||
memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData);
|
||||
offset += pLastCol->rowKey.pks[i].nData;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void tsdbCachePutBatch(SLastCol *pLastCol, const void *key, size_t klen, SCacheFlushState *state) {
|
||||
|
|
Loading…
Reference in New Issue