diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 7d22e59b4a..b3999caace 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -341,6 +341,16 @@ _exit: return code; } +static void reallocVarData(SColVal *pColVal) { + if (IS_VAR_DATA_TYPE(pColVal->type)) { + uint8_t *pVal = pColVal->value.pData; + pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData); + if (pColVal->value.nData) { + memcpy(pColVal->value.pData, pVal, pColVal->value.nData); + } + } +} + static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds); @@ -348,9 +358,10 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, int nCols, int16_t *slotIds); int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype) { - static char const *alstring[2] = {"last_row", "last"}; - char const *lstring = alstring[ltype]; - int32_t code = 0; + static char const *alstring[2] = {"last_row", "last"}; + char const *lstring = alstring[ltype]; + rocksdb_writebatch_t *wb = NULL; + int32_t code = 0; SArray *pCidList = pr->pCidList; int num_keys = TARRAY_SIZE(pCidList); @@ -388,14 +399,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; if (pLastCol) { - SColVal *pColVal = &pLastCol->colVal; - if (IS_VAR_DATA_TYPE(pColVal->type)) { - uint8_t *pVal = pColVal->value.pData; - pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData); - if (pColVal->value.nData) { - memcpy(pColVal->value.pData, pVal, pColVal->value.nData); - } - } + reallocVarData(&pLastCol->colVal); } else { taosThreadMutexLock(&pTsdb->rCache.rMutex); @@ -424,31 +428,28 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR } // store result back to rocks cache - rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; - char *value = NULL; - size_t vlen = 0; + wb = pTsdb->rCache.writebatch; + char *value = NULL; + size_t vlen = 0; tsdbCacheSerialize(pLastCol, &value, &vlen); char key[ROCKS_KEY_LEN]; size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, pLastCol->colVal.cid, lstring); rocksdb_writebatch_put(wb, key, klen, value, vlen); taosMemoryFree(value); + } else { + reallocVarData(&pLastCol->colVal); + } + if (wb) { char *err = NULL; rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); if (NULL != err) { tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); rocksdb_free(err); } - } else { - SColVal *pColVal = &pLastCol->colVal; - if (IS_VAR_DATA_TYPE(pColVal->type)) { - uint8_t *pVal = pColVal->value.pData; - pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData); - if (pColVal->value.nData) { - memcpy(pColVal->value.pData, pVal, pColVal->value.nData); - } - } + + rocksdb_writebatch_clear(wb); } taosThreadMutexUnlock(&pTsdb->rCache.rMutex);