Merge pull request #21479 from taosdata/fix/TD-24405
fix(cache): fix deleted data
This commit is contained in:
commit
66b7d90849
|
@ -703,6 +703,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
||||||
*pTmpLastCol = *pLastCol;
|
*pTmpLastCol = *pLastCol;
|
||||||
pLastCol = pTmpLastCol;
|
pLastCol = pTmpLastCol;
|
||||||
|
|
||||||
|
reallocVarData(&pLastCol->colVal);
|
||||||
size_t charge = sizeof(*pLastCol);
|
size_t charge = sizeof(*pLastCol);
|
||||||
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
|
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
|
||||||
charge += pLastCol->colVal.value.nData;
|
charge += pLastCol->colVal.value.nData;
|
||||||
|
@ -789,7 +790,9 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArraySet(pLastArray, idxKey->idx, pLastCol);
|
SLastCol lastCol = *pLastCol;
|
||||||
|
reallocVarData(&lastCol.colVal);
|
||||||
|
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
||||||
taosArrayRemove(remainCols, j);
|
taosArrayRemove(remainCols, j);
|
||||||
|
|
||||||
taosMemoryFree(values_list[i]);
|
taosMemoryFree(values_list[i]);
|
||||||
|
@ -825,7 +828,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
||||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
||||||
|
|
||||||
SLastCol lastCol = *pLastCol;
|
SLastCol lastCol = *pLastCol;
|
||||||
// reallocVarData(&lastCol.colVal);
|
reallocVarData(&lastCol.colVal);
|
||||||
taosArrayPush(pLastArray, &lastCol);
|
taosArrayPush(pLastArray, &lastCol);
|
||||||
|
|
||||||
if (h) {
|
if (h) {
|
||||||
|
@ -853,8 +856,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
||||||
|
|
||||||
SLastCol lastCol = *pLastCol;
|
SLastCol lastCol = *pLastCol;
|
||||||
reallocVarData(&lastCol.colVal);
|
reallocVarData(&lastCol.colVal);
|
||||||
|
|
||||||
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
||||||
|
|
||||||
if (h) {
|
if (h) {
|
||||||
taosLRUCacheRelease(pCache, h, false);
|
taosLRUCacheRelease(pCache, h, false);
|
||||||
}
|
}
|
||||||
|
@ -937,14 +940,14 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
||||||
}
|
}
|
||||||
|
|
||||||
// build keys & multi get from rocks
|
// build keys & multi get from rocks
|
||||||
int num_keys = pTSchema->numOfCols;
|
int num_keys = pTSchema->numOfCols;
|
||||||
char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
||||||
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
|
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
|
||||||
|
const size_t klen = ROCKS_KEY_LEN;
|
||||||
for (int i = 0; i < num_keys; ++i) {
|
for (int i = 0; i < num_keys; ++i) {
|
||||||
int16_t cid = pTSchema->columns[i].colId;
|
int16_t cid = pTSchema->columns[i].colId;
|
||||||
|
|
||||||
size_t klen = ROCKS_KEY_LEN;
|
char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
|
||||||
char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
|
|
||||||
((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid};
|
((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid};
|
||||||
((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid};
|
((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid};
|
||||||
|
|
||||||
|
@ -960,39 +963,35 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
||||||
rocksMayWrite(pTsdb, true, false, false);
|
rocksMayWrite(pTsdb, true, false, false);
|
||||||
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
|
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
|
||||||
keys_list_sizes, values_list, values_list_sizes, errs);
|
keys_list_sizes, values_list, values_list_sizes, errs);
|
||||||
for (int i = 0; i < num_keys; ++i) {
|
|
||||||
taosMemoryFree(keys_list[i]);
|
|
||||||
}
|
|
||||||
for (int i = 0; i < num_keys * 2; ++i) {
|
for (int i = 0; i < num_keys * 2; ++i) {
|
||||||
rocksdb_free(errs[i]);
|
if (errs[i]) {
|
||||||
|
rocksdb_free(errs[i]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosMemoryFree(keys_list);
|
|
||||||
taosMemoryFree(keys_list_sizes);
|
|
||||||
taosMemoryFree(errs);
|
taosMemoryFree(errs);
|
||||||
|
|
||||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||||
for (int i = 0; i < num_keys; ++i) {
|
for (int i = 0; i < num_keys; ++i) {
|
||||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
||||||
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
|
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
|
||||||
SLastKey *key = &(SLastKey){.ltype = 1, .uid = uid, .cid = pLastCol->colVal.cid};
|
rocksdb_writebatch_delete(wb, keys_list[i], klen);
|
||||||
size_t klen = ROCKS_KEY_LEN;
|
|
||||||
|
|
||||||
rocksdb_writebatch_delete(wb, (char *)key, klen);
|
|
||||||
taosLRUCacheErase(pTsdb->lruCache, key, klen);
|
|
||||||
}
|
}
|
||||||
|
taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
|
||||||
|
|
||||||
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
|
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
|
||||||
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
|
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
|
||||||
SLastKey *key = &(SLastKey){.ltype = 0, .uid = uid, .cid = pLastCol->colVal.cid};
|
rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen);
|
||||||
size_t klen = ROCKS_KEY_LEN;
|
|
||||||
|
|
||||||
rocksdb_writebatch_delete(wb, (char *)key, klen);
|
|
||||||
taosLRUCacheErase(pTsdb->lruCache, key, klen);
|
|
||||||
}
|
}
|
||||||
|
taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen);
|
||||||
|
|
||||||
rocksdb_free(values_list[i]);
|
rocksdb_free(values_list[i]);
|
||||||
rocksdb_free(values_list[i + num_keys]);
|
rocksdb_free(values_list[i + num_keys]);
|
||||||
}
|
}
|
||||||
|
for (int i = 0; i < num_keys; ++i) {
|
||||||
|
taosMemoryFree(keys_list[i]);
|
||||||
|
}
|
||||||
|
taosMemoryFree(keys_list);
|
||||||
|
taosMemoryFree(keys_list_sizes);
|
||||||
taosMemoryFree(values_list);
|
taosMemoryFree(values_list);
|
||||||
taosMemoryFree(values_list_sizes);
|
taosMemoryFree(values_list_sizes);
|
||||||
|
|
||||||
|
@ -1871,10 +1870,14 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
||||||
if (isLast && (pColData->flag & HAS_VALUE)) {
|
if (isLast && (pColData->flag & HAS_VALUE)) {
|
||||||
skipBlock = false;
|
skipBlock = false;
|
||||||
break;
|
break;
|
||||||
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
|
} /*else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
|
||||||
skipBlock = false;
|
skipBlock = false;
|
||||||
break;
|
break;
|
||||||
}
|
}*/
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isLast) {
|
||||||
|
skipBlock = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (skipBlock) {
|
if (skipBlock) {
|
||||||
|
@ -1908,6 +1911,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
||||||
if (checkRemainingRow) {
|
if (checkRemainingRow) {
|
||||||
bool skipBlock = true;
|
bool skipBlock = true;
|
||||||
int inputColIndex = 0;
|
int inputColIndex = 0;
|
||||||
|
if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
|
++inputColIndex;
|
||||||
|
}
|
||||||
for (int colIndex = 0; colIndex < state->pBlockData->nColData; ++colIndex) {
|
for (int colIndex = 0; colIndex < state->pBlockData->nColData; ++colIndex) {
|
||||||
SColData *pColData = &state->pBlockData->aColData[colIndex];
|
SColData *pColData = &state->pBlockData->aColData[colIndex];
|
||||||
int16_t cid = pColData->cid;
|
int16_t cid = pColData->cid;
|
||||||
|
@ -1916,15 +1922,19 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
||||||
if (isLast && (pColData->flag & HAS_VALUE)) {
|
if (isLast && (pColData->flag & HAS_VALUE)) {
|
||||||
skipBlock = false;
|
skipBlock = false;
|
||||||
break;
|
break;
|
||||||
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
|
} /*else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
|
||||||
skipBlock = false;
|
skipBlock = false;
|
||||||
break;
|
break;
|
||||||
}
|
}*/
|
||||||
|
|
||||||
++inputColIndex;
|
++inputColIndex;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!isLast) {
|
||||||
|
skipBlock = false;
|
||||||
|
}
|
||||||
|
|
||||||
if (skipBlock) {
|
if (skipBlock) {
|
||||||
if (--state->iBlock < 0) {
|
if (--state->iBlock < 0) {
|
||||||
tsdbDataFReaderClose(state->pDataFReader);
|
tsdbDataFReaderClose(state->pDataFReader);
|
||||||
|
@ -2145,9 +2155,14 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
|
||||||
return false;
|
return false;
|
||||||
} else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
|
} else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
|
||||||
if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
|
if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
|
||||||
|
// if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return false;
|
if (*iSkyline > 1) {
|
||||||
|
--*iSkyline;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (*iSkyline > 1) {
|
if (*iSkyline > 1) {
|
||||||
|
@ -2959,7 +2974,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
|
||||||
|
|
||||||
do {
|
do {
|
||||||
TSDBROW *pRow = NULL;
|
TSDBROW *pRow = NULL;
|
||||||
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
|
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
|
||||||
|
|
||||||
if (!pRow) {
|
if (!pRow) {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -315,14 +315,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
tsdbCacheGetBatch(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
|
tsdbCacheGetBatch(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
|
||||||
// tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
|
// tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
|
||||||
if (TARRAY_SIZE(pRow) <= 0) {
|
if (TARRAY_SIZE(pRow) <= 0) {
|
||||||
// taosArrayClearEx(pRow, freeItem);
|
taosArrayClearEx(pRow, freeItem);
|
||||||
taosArrayClear(pRow);
|
// taosArrayClear(pRow);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
SLastCol* pColVal = taosArrayGet(pRow, 0);
|
SLastCol* pColVal = taosArrayGet(pRow, 0);
|
||||||
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
|
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
|
||||||
// taosArrayClearEx(pRow, freeItem);
|
taosArrayClearEx(pRow, freeItem);
|
||||||
taosArrayClear(pRow);
|
// taosArrayClear(pRow);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -381,8 +381,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// taosArrayClearEx(pRow, freeItem);
|
taosArrayClearEx(pRow, freeItem);
|
||||||
taosArrayClear(pRow);
|
// taosArrayClear(pRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasRes) {
|
if (hasRes) {
|
||||||
|
@ -394,20 +394,20 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
|
|
||||||
tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
|
tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
|
||||||
if (TARRAY_SIZE(pRow) <= 0) {
|
if (TARRAY_SIZE(pRow) <= 0) {
|
||||||
// taosArrayClearEx(pRow, freeItem);
|
taosArrayClearEx(pRow, freeItem);
|
||||||
taosArrayClear(pRow);
|
// taosArrayClear(pRow);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
|
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
|
||||||
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
|
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
|
||||||
// taosArrayClearEx(pRow, freeItem);
|
taosArrayClearEx(pRow, freeItem);
|
||||||
taosArrayClear(pRow);
|
// taosArrayClear(pRow);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
|
saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
|
||||||
// taosArrayClearEx(pRow, freeItem);
|
taosArrayClearEx(pRow, freeItem);
|
||||||
taosArrayClear(pRow);
|
// taosArrayClear(pRow);
|
||||||
|
|
||||||
taosArrayPush(pTableUidList, &uid);
|
taosArrayPush(pTableUidList, &uid);
|
||||||
|
|
||||||
|
|
|
@ -190,9 +190,9 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
||||||
tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey);
|
tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey);
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
if (eKey >= pTbData->maxKey && sKey <= pTbData->maxKey) {
|
// if (eKey >= pTbData->maxKey && sKey <= pTbData->maxKey) {
|
||||||
tsdbCacheDel(pTsdb, suid, uid, sKey, eKey);
|
tsdbCacheDel(pTsdb, suid, uid, sKey, eKey);
|
||||||
}
|
//}
|
||||||
|
|
||||||
tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
||||||
" at version %" PRId64,
|
" at version %" PRId64,
|
||||||
|
|
Loading…
Reference in New Issue