fix: pk last write error

This commit is contained in:
Shungang Li 2024-04-09 17:42:20 +08:00
parent 87b04c4eea
commit dd55034c94
2 changed files with 86 additions and 31 deletions

View File

@ -895,6 +895,7 @@ typedef enum {
} EExecMode; } EExecMode;
typedef struct { typedef struct {
int64_t version;
SRowKey rowKey; SRowKey rowKey;
int8_t dirty; int8_t dirty;
SColVal colVal; SColVal colVal;

View File

@ -339,6 +339,7 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
static SLastCol *tsdbCacheConvertLastColV1(SLastColV1 *pLastColV1) { static SLastCol *tsdbCacheConvertLastColV1(SLastColV1 *pLastColV1) {
SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
if (pLastCol == NULL) return NULL; if (pLastCol == NULL) return NULL;
pLastCol->version = LAST_COL_VERSION;
pLastCol->rowKey.ts = pLastColV1->ts; pLastCol->rowKey.ts = pLastColV1->ts;
pLastCol->rowKey.numOfPKs = 0; pLastCol->rowKey.numOfPKs = 0;
pLastCol->dirty = pLastColV1->dirty; pLastCol->dirty = pLastColV1->dirty;
@ -408,7 +409,7 @@ static SLastCol *tsdbCacheDeserialize(char const *value) {
if (!hasVersion) { if (!hasVersion) {
return tsdbCacheDeserializeV1(value); return tsdbCacheDeserializeV1(value);
} }
return tsdbCacheDeserializeV2(value + sizeof(int64_t)); return tsdbCacheDeserializeV2(value);
} }
static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) { static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) {
@ -423,7 +424,7 @@ static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) {
static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
SColVal *pColVal = &pLastCol->colVal; SColVal *pColVal = &pLastCol->colVal;
size_t length = sizeof(int64_t) + sizeof(*pLastCol); size_t length = sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) { if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
length += pLastCol->rowKey.pks[i].nData; length += pLastCol->rowKey.pks[i].nData;
@ -435,14 +436,11 @@ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
// set version // set version
*value = taosMemoryMalloc(length); *value = taosMemoryMalloc(length);
char *currentPos = *value;
*(int64_t *)currentPos = LAST_COL_VERSION;
currentPos += sizeof(int64_t);
// copy last col // copy last col
SLastCol* pToLastCol = (SLastCol *)currentPos; SLastCol* pToLastCol = (SLastCol *)(*value);
*pToLastCol = *pLastCol; *pToLastCol = *pLastCol;
currentPos += sizeof(*pLastCol); char *currentPos = *value + sizeof(*pLastCol);
// copy var data pks // copy var data pks
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
@ -535,18 +533,22 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) {
return code; return code;
} }
static void reallocVarData(SColVal *pColVal) { static void reallocVarDataVal(SValue *pValue) {
if (IS_VAR_DATA_TYPE(pColVal->value.type)) { if (IS_VAR_DATA_TYPE(pValue->type)) {
uint8_t *pVal = pColVal->value.pData; uint8_t *pVal = pValue->pData;
if (pColVal->value.nData > 0) { if (pValue->nData > 0) {
pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData); pValue->pData = taosMemoryMalloc(pValue->nData);
memcpy(pColVal->value.pData, pVal, pColVal->value.nData); memcpy(pValue->pData, pVal, pValue->nData);
} else { } else {
pColVal->value.pData = NULL; pValue->pData = NULL;
} }
} }
} }
static void reallocVarData(SColVal *pColVal) {
reallocVarDataVal(&pColVal->value);
}
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) { static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
SLastCol *pLastCol = (SLastCol *)value; SLastCol *pLastCol = (SLastCol *)value;
@ -569,16 +571,26 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
SRowKey noneRowKey = {0}; SRowKey noneRowKey = {0};
noneRowKey.ts = TSKEY_MIN; noneRowKey.ts = TSKEY_MIN;
noneRowKey.numOfPKs = 0; noneRowKey.numOfPKs = 0;
SLastCol noneCol = {.rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1}; SLastCol noneCol = {
.version = LAST_COL_VERSION, .rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1};
SLastCol *pLastCol = &noneCol; SLastCol *pLastCol = &noneCol;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol; *pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol; pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol); size_t charge = sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
SValue *pValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
reallocVarDataVal(pValue);
charge += pValue->nData;
}
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
reallocVarData(&pLastCol->colVal);
charge += pLastCol->colVal.value.nData; charge += pLastCol->colVal.value.nData;
} }
@ -923,6 +935,7 @@ static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal
int nData = 0; int nData = 0;
// update rowkey // update rowkey
pLastCol->version = LAST_COL_VERSION;
pLastCol->rowKey.ts = pRowKey->ts; pLastCol->rowKey.ts = pRowKey->ts;
pLastCol->rowKey.numOfPKs = pRowKey->numOfPKs; pLastCol->rowKey.numOfPKs = pRowKey->numOfPKs;
for (int8_t i = 0; i < pRowKey->numOfPKs; i++) { for (int8_t i = 0; i < pRowKey->numOfPKs; i++) {
@ -1015,7 +1028,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
if (h) { if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) { if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) {
tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal);
} }
@ -1032,7 +1044,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
if (h) { if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) { if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) {
tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal);
} }
@ -1096,9 +1107,17 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
*pTmpLastCol = *pLastCol; *pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol; pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol); size_t charge = sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
SValue *pValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
reallocVarDataVal(pValue);
charge += pValue->nData;
}
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
reallocVarData(&pLastCol->colVal);
charge += pLastCol->colVal.value.nData; charge += pLastCol->colVal.value.nData;
} }
@ -1128,9 +1147,17 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
*pTmpLastCol = *pLastCol; *pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol; pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol); size_t charge = sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
SValue *pValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
reallocVarDataVal(pValue);
charge += pValue->nData;
}
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
reallocVarData(&pLastCol->colVal);
charge += pLastCol->colVal.value.nData; charge += pLastCol->colVal.value.nData;
} }
@ -1428,7 +1455,8 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
} }
// still null, then make up a none col value // still null, then make up a none col value
SLastCol noneCol = {.rowKey.ts = TSKEY_MIN, SLastCol noneCol = {.version = LAST_COL_VERSION,
.rowKey.ts = TSKEY_MIN,
.colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)}; .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)};
if (!pLastCol) { if (!pLastCol) {
pLastCol = &noneCol; pLastCol = &noneCol;
@ -1446,9 +1474,16 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
*pTmpLastCol = *pLastCol; *pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol; pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol); size_t charge = sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
SValue *pValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
reallocVarDataVal(pValue);
charge += pValue->nData;
}
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
reallocVarData(&pLastCol->colVal);
charge += pLastCol->colVal.value.nData; charge += pLastCol->colVal.value.nData;
} }
@ -1530,9 +1565,16 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
*pTmpLastCol = *pLastCol; *pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol; pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol); size_t charge = sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
SValue *pValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
reallocVarDataVal(pValue);
charge += pValue->nData;
}
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
reallocVarData(&pLastCol->colVal);
charge += pLastCol->colVal.value.nData; charge += pLastCol->colVal.value.nData;
} }
@ -1543,6 +1585,9 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
} }
SLastCol lastCol = *pLastCol; SLastCol lastCol = *pLastCol;
for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) {
reallocVarDataVal(&lastCol.rowKey.pks[i]);
}
reallocVarData(&lastCol.colVal); reallocVarData(&lastCol.colVal);
taosArraySet(pLastArray, idxKey->idx, &lastCol); taosArraySet(pLastArray, idxKey->idx, &lastCol);
taosArrayRemove(remainCols, j); taosArrayRemove(remainCols, j);
@ -1591,12 +1636,17 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
SLastCol lastCol = *pLastCol; SLastCol lastCol = *pLastCol;
for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) {
reallocVarDataVal(&lastCol.rowKey.pks[i]);
}
reallocVarData(&lastCol.colVal); reallocVarData(&lastCol.colVal);
taosArrayPush(pLastArray, &lastCol); taosArrayPush(pLastArray, &lastCol);
taosLRUCacheRelease(pCache, h, false); taosLRUCacheRelease(pCache, h, false);
} else { } else {
SLastCol noneCol = {.rowKey.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; SLastCol noneCol = {.version = LAST_COL_VERSION,
.rowKey.ts = TSKEY_MIN,
.colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
taosArrayPush(pLastArray, &noneCol); taosArrayPush(pLastArray, &noneCol);
@ -1616,6 +1666,9 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
SLastCol lastCol = *pLastCol; SLastCol lastCol = *pLastCol;
for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) {
reallocVarDataVal(&lastCol.rowKey.pks[i]);
}
reallocVarData(&lastCol.colVal); reallocVarData(&lastCol.colVal);
taosArraySet(pLastArray, idxKey->idx, &lastCol); taosArraySet(pLastArray, idxKey->idx, &lastCol);
@ -1702,7 +1755,6 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
rocksdb_writebatch_delete(wb, keys_list[i], klen); rocksdb_writebatch_delete(wb, keys_list[i], klen);
} }
taosMemoryFreeClear(pLastCol); taosMemoryFreeClear(pLastCol);
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
@ -3232,7 +3284,9 @@ static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray,
for (int32_t i = 0; i < nCols; ++i) { for (int32_t i = 0; i < nCols; ++i) {
int16_t slotId = slotIds[i]; int16_t slotId = slotIds[i];
SLastCol col = {.rowKey.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)}; SLastCol col = {.version = LAST_COL_VERSION,
.rowKey.ts = 0,
.colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
taosArrayPush(pColArray, &col); taosArrayPush(pColArray, &col);
} }
*ppColArray = pColArray; *ppColArray = pColArray;
@ -3337,12 +3391,12 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
STColumn *pTColumn = &pTSchema->columns[0]; STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs})); *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs}));
taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}); taosArraySet(pColArray, 0, &(SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal});
continue; continue;
} }
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}; *pCol = (SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) { if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) {
if (pColVal->value.nData > 0) { if (pColVal->value.nData > 0) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
@ -3392,7 +3446,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) { if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) {
SLastCol lastCol = {.rowKey.ts = rowTs, .colVal = *pColVal}; SLastCol lastCol = {.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) { if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
taosMemoryFree(pLastCol->colVal.value.pData); taosMemoryFree(pLastCol->colVal.value.pData);
@ -3516,12 +3570,12 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
STColumn *pTColumn = &pTSchema->columns[0]; STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs})); *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs}));
taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}); taosArraySet(pColArray, 0, &(SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal});
continue; continue;
} }
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal}; *pCol = (SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) { if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) {
if (pColVal->value.nData > 0) { if (pColVal->value.nData > 0) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);