cache/get: store recalced column value back into rocks
This commit is contained in:
parent
bb7f050b37
commit
198ce399d2
|
@ -365,6 +365,21 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
|
||||||
}
|
}
|
||||||
|
|
||||||
// maybe store it back to rocks cache
|
// maybe store it back to rocks cache
|
||||||
|
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||||
|
char *value = NULL;
|
||||||
|
size_t vlen = 0;
|
||||||
|
tsdbCacheSerialize(pLastCol, &value, &vlen);
|
||||||
|
char key[ROCKS_KEY_LEN];
|
||||||
|
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pLastCol->colVal.cid);
|
||||||
|
rocksdb_writebatch_put(wb, key, klen, value, vlen);
|
||||||
|
char *err = NULL;
|
||||||
|
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
|
||||||
|
if (NULL != err) {
|
||||||
|
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||||
|
rocksdb_free(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pLastArray, pLastCol);
|
taosArrayPush(pLastArray, pLastCol);
|
||||||
|
@ -2256,7 +2271,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
||||||
if (lastRowTs == TSKEY_MAX) {
|
if (lastRowTs == TSKEY_MAX) {
|
||||||
lastRowTs = rowTs;
|
lastRowTs = rowTs;
|
||||||
|
|
||||||
for (int16_t iCol = 0; iCol < nCols; ++iCol) {
|
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
|
||||||
if (iCol >= nLastCol) {
|
if (iCol >= nLastCol) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2264,6 +2279,13 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
||||||
if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
|
if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (slotIds[iCol] == 0) {
|
||||||
|
STColumn *pTColumn = &pTSchema->columns[0];
|
||||||
|
|
||||||
|
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
|
||||||
|
taosArraySet(pColArray, 0, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||||
|
|
||||||
*pCol = (SLastCol){.ts = lastRowTs, .colVal = *pColVal};
|
*pCol = (SLastCol){.ts = lastRowTs, .colVal = *pColVal};
|
||||||
|
@ -2336,10 +2358,6 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
||||||
}
|
}
|
||||||
} while (setNoneCol);
|
} while (setNoneCol);
|
||||||
|
|
||||||
// if (taosArrayGetSize(pColArray) <= 0) {
|
|
||||||
//*ppLastArray = NULL;
|
|
||||||
// taosArrayDestroy(pColArray);
|
|
||||||
//} else {
|
|
||||||
if (!hasRow) {
|
if (!hasRow) {
|
||||||
if (ignoreEarlierTs) {
|
if (ignoreEarlierTs) {
|
||||||
taosArrayDestroy(pColArray);
|
taosArrayDestroy(pColArray);
|
||||||
|
@ -2349,11 +2367,10 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*ppLastArray = pColArray;
|
*ppLastArray = pColArray;
|
||||||
//}
|
|
||||||
|
|
||||||
nextRowIterClose(&iter);
|
nextRowIterClose(&iter);
|
||||||
taosArrayDestroy(aColArray);
|
taosArrayDestroy(aColArray);
|
||||||
// taosMemoryFreeClear(pTSchema);
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
|
Loading…
Reference in New Issue