Merge pull request #26630 from taosdata/fix/TD-30816-3.0
fix: (pk) load last cache from tsdb
This commit is contained in:
commit
f56ab1ee27
|
@ -1307,186 +1307,6 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
|||
|
||||
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
||||
int nCols, int16_t *slotIds);
|
||||
#ifdef BUILD_NO_CALL
|
||||
static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) {
|
||||
SLastCol *pLastCol = NULL;
|
||||
|
||||
char *err = NULL;
|
||||
size_t vlen = 0;
|
||||
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
|
||||
size_t klen = ROCKS_KEY_LEN;
|
||||
char *value = NULL;
|
||||
value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err);
|
||||
if (NULL != err) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||
rocksdb_free(err);
|
||||
}
|
||||
|
||||
pLastCol = tsdbCacheDeserialize(value);
|
||||
|
||||
return pLastCol;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
|
||||
rocksdb_writebatch_t *wb = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
SArray *pCidList = pr->pCidList;
|
||||
int num_keys = TARRAY_SIZE(pCidList);
|
||||
|
||||
char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
|
||||
size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
|
||||
char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
|
||||
|
||||
memcpy(key_list + i * ROCKS_KEY_LEN, &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}, ROCKS_KEY_LEN);
|
||||
keys_list[i] = key_list + i * ROCKS_KEY_LEN;
|
||||
keys_list_sizes[i] = ROCKS_KEY_LEN;
|
||||
}
|
||||
|
||||
char **values_list = taosMemoryCalloc(num_keys, sizeof(char *));
|
||||
size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
|
||||
char **errs = taosMemoryMalloc(num_keys * sizeof(char *));
|
||||
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list,
|
||||
keys_list_sizes, values_list, values_list_sizes, errs);
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
if (errs[i]) {
|
||||
rocksdb_free(errs[i]);
|
||||
}
|
||||
}
|
||||
taosMemoryFree(key_list);
|
||||
taosMemoryFree(keys_list);
|
||||
taosMemoryFree(keys_list_sizes);
|
||||
taosMemoryFree(errs);
|
||||
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
bool freeCol = true;
|
||||
SArray *pTmpColArray = NULL;
|
||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
||||
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
|
||||
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
|
||||
if (pLastCol) {
|
||||
reallocVarData(&pLastCol->colVal);
|
||||
} else {
|
||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||
|
||||
pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype);
|
||||
if (!pLastCol) {
|
||||
// recalc: load from tsdb
|
||||
int16_t aCols[1] = {cid};
|
||||
int16_t slotIds[1] = {pr->pSlotIds[i]};
|
||||
pTmpColArray = NULL;
|
||||
|
||||
if (ltype) {
|
||||
mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds);
|
||||
} else {
|
||||
mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds);
|
||||
}
|
||||
|
||||
if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) {
|
||||
pLastCol = taosArrayGet(pTmpColArray, 0);
|
||||
freeCol = false;
|
||||
}
|
||||
|
||||
// still null, then make up a none col value
|
||||
if (!pLastCol) {
|
||||
pLastCol = &noneCol;
|
||||
freeCol = false;
|
||||
}
|
||||
|
||||
// store result back to rocks cache
|
||||
wb = pTsdb->rCache.rwritebatch;
|
||||
char *value = NULL;
|
||||
size_t vlen = 0;
|
||||
tsdbCacheSerialize(pLastCol, &value, &vlen);
|
||||
|
||||
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = pLastCol->colVal.cid};
|
||||
size_t klen = ROCKS_KEY_LEN;
|
||||
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
|
||||
|
||||
taosMemoryFree(value);
|
||||
} else {
|
||||
reallocVarData(&pLastCol->colVal);
|
||||
}
|
||||
|
||||
if (wb) {
|
||||
rocksMayWrite(pTsdb, false, true, false);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
||||
}
|
||||
|
||||
taosArrayPush(pLastArray, pLastCol);
|
||||
taosArrayDestroy(pTmpColArray);
|
||||
if (freeCol) {
|
||||
taosMemoryFree(pLastCol);
|
||||
}
|
||||
}
|
||||
taosMemoryFree(values_list);
|
||||
taosMemoryFree(values_list_sizes);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t slotid, tb_uid_t uid, int16_t cid,
|
||||
int8_t ltype) {
|
||||
SLastCol *pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype);
|
||||
if (!pLastCol) {
|
||||
rocksdb_writebatch_t *wb = NULL;
|
||||
|
||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||
pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype);
|
||||
if (!pLastCol) {
|
||||
// recalc: load from tsdb
|
||||
int16_t aCols[1] = {cid};
|
||||
int16_t slotIds[1] = {slotid};
|
||||
SArray *pTmpColArray = NULL;
|
||||
|
||||
if (ltype) {
|
||||
mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds);
|
||||
} else {
|
||||
mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds);
|
||||
}
|
||||
|
||||
if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) {
|
||||
pLastCol = taosArrayGet(pTmpColArray, 0);
|
||||
}
|
||||
|
||||
// still null, then make up a none col value
|
||||
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[slotid].type)};
|
||||
if (!pLastCol) {
|
||||
pLastCol = &noneCol;
|
||||
}
|
||||
|
||||
// store result back to rocks cache
|
||||
wb = pTsdb->rCache.rwritebatch;
|
||||
char *value = NULL;
|
||||
size_t vlen = 0;
|
||||
tsdbCacheSerialize(pLastCol, &value, &vlen);
|
||||
|
||||
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = pLastCol->colVal.cid};
|
||||
size_t klen = ROCKS_KEY_LEN;
|
||||
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
|
||||
taosMemoryFree(value);
|
||||
|
||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
*pTmpLastCol = *pLastCol;
|
||||
pLastCol = pTmpLastCol;
|
||||
|
||||
taosArrayDestroy(pTmpColArray);
|
||||
}
|
||||
|
||||
if (wb) {
|
||||
rocksMayWrite(pTsdb, false, true, false);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
||||
}
|
||||
|
||||
return pLastCol;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
|
||||
SCacheRowsReader *pr, int8_t ltype) {
|
||||
|
@ -3453,8 +3273,9 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
|||
taosArrayPush(aColArray, &aCols[i]);
|
||||
}
|
||||
|
||||
TSKEY lastRowTs = TSKEY_MAX;
|
||||
STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX};
|
||||
|
||||
// inverse iterator
|
||||
CacheNextRowIter iter = {0};
|
||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
|
||||
|
||||
|
@ -3478,10 +3299,11 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
|||
}
|
||||
// int16_t nCol = pTSchema->numOfCols;
|
||||
|
||||
TSKEY rowTs = TSDBROW_TS(pRow);
|
||||
STsdbRowKey rowKey = {0};
|
||||
tsdbRowGetKey(pRow, &rowKey);
|
||||
|
||||
if (lastRowTs == TSKEY_MAX) {
|
||||
lastRowTs = rowTs;
|
||||
if (lastRowKey.key.ts == TSKEY_MAX) { // first time
|
||||
lastRowKey = rowKey;
|
||||
|
||||
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
|
||||
if (iCol >= nLastCol) {
|
||||
|
@ -3501,13 +3323,13 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
|||
if (slotIds[iCol] == 0) {
|
||||
STColumn *pTColumn = &pTSchema->columns[0];
|
||||
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs}));
|
||||
taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal});
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
|
||||
taosArraySet(pColArray, 0, &(SLastCol){.rowKey = rowKey.key, .colVal = *pColVal});
|
||||
continue;
|
||||
}
|
||||
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||
|
||||
*pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal};
|
||||
*pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal};
|
||||
if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) {
|
||||
if (pColVal->value.nData > 0) {
|
||||
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
|
||||
|
@ -3554,10 +3376,11 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
|||
continue;
|
||||
}
|
||||
SColVal *tColVal = &lastColVal->colVal;
|
||||
if (COL_VAL_IS_VALUE(tColVal)) continue;
|
||||
|
||||
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||
if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) {
|
||||
SLastCol lastCol = {.rowKey.ts = rowTs, .colVal = *pColVal};
|
||||
if (COL_VAL_IS_VALUE(pColVal)) {
|
||||
SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal};
|
||||
if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) {
|
||||
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
|
||||
taosMemoryFree(pLastCol->colVal.value.pData);
|
||||
|
@ -3580,7 +3403,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
|||
if (aColIndex >= 0) {
|
||||
taosArrayRemove(aColArray, aColIndex);
|
||||
}
|
||||
} else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
|
||||
} else if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
|
@ -3637,8 +3460,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
|
|||
taosArrayPush(aColArray, &aCols[i]);
|
||||
}
|
||||
|
||||
TSKEY lastRowTs = TSKEY_MAX;
|
||||
|
||||
// inverse iterator
|
||||
CacheNextRowIter iter = {0};
|
||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
|
||||
|
||||
|
@ -3662,9 +3484,8 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
|
|||
}
|
||||
// int16_t nCol = pTSchema->numOfCols;
|
||||
|
||||
TSKEY rowTs = TSDBROW_TS(pRow);
|
||||
|
||||
lastRowTs = rowTs;
|
||||
STsdbRowKey rowKey = {0};
|
||||
tsdbRowGetKey(pRow, &rowKey);
|
||||
|
||||
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
|
||||
if (iCol >= nLastCol) {
|
||||
|
@ -3680,13 +3501,13 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
|
|||
if (slotIds[iCol] == 0) {
|
||||
STColumn *pTColumn = &pTSchema->columns[0];
|
||||
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs}));
|
||||
taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal});
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
|
||||
taosArraySet(pColArray, 0, &(SLastCol){.rowKey = rowKey.key, .colVal = *pColVal});
|
||||
continue;
|
||||
}
|
||||
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||
|
||||
*pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal};
|
||||
*pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal};
|
||||
if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) {
|
||||
if (pColVal->value.nData > 0) {
|
||||
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
|
||||
|
|
Loading…
Reference in New Issue