fix(cache/get): move rocks wring out of column iterating
This commit is contained in:
parent
0958fe5635
commit
10cee51d32
|
@ -340,6 +340,16 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static void reallocVarData(SColVal *pColVal) {
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||
uint8_t *pVal = pColVal->value.pData;
|
||||
pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData);
|
||||
if (pColVal->value.nData) {
|
||||
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
||||
int nCols, int16_t *slotIds);
|
||||
|
||||
|
@ -349,6 +359,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
|
|||
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype) {
|
||||
static char const *alstring[2] = {"last_row", "last"};
|
||||
char const *lstring = alstring[ltype];
|
||||
rocksdb_writebatch_t *wb = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
SArray *pCidList = pr->pCidList;
|
||||
|
@ -387,14 +398,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
|
|||
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
|
||||
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
|
||||
if (pLastCol) {
|
||||
SColVal *pColVal = &pLastCol->colVal;
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||
uint8_t *pVal = pColVal->value.pData;
|
||||
pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData);
|
||||
if (pColVal->value.nData) {
|
||||
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
|
||||
}
|
||||
}
|
||||
reallocVarData(&pLastCol->colVal);
|
||||
} else {
|
||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||
|
||||
|
@ -423,7 +427,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
|
|||
}
|
||||
|
||||
// store result back to rocks cache
|
||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||
wb = pTsdb->rCache.writebatch;
|
||||
char *value = NULL;
|
||||
size_t vlen = 0;
|
||||
tsdbCacheSerialize(pLastCol, &value, &vlen);
|
||||
|
@ -432,22 +436,8 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
|
|||
rocksdb_writebatch_put(wb, key, klen, value, vlen);
|
||||
|
||||
taosMemoryFree(value);
|
||||
|
||||
char *err = NULL;
|
||||
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
|
||||
if (NULL != err) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||
rocksdb_free(err);
|
||||
}
|
||||
} else {
|
||||
SColVal *pColVal = &pLastCol->colVal;
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||
uint8_t *pVal = pColVal->value.pData;
|
||||
pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData);
|
||||
if (pColVal->value.nData) {
|
||||
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
|
||||
}
|
||||
}
|
||||
reallocVarData(&pLastCol->colVal);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
||||
|
@ -463,6 +453,14 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
|
|||
taosMemoryFree(values_list);
|
||||
taosMemoryFree(values_list_sizes);
|
||||
|
||||
if (wb) {
|
||||
char *err = NULL;
|
||||
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
|
||||
if (NULL != err) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||
rocksdb_free(err);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue