first round rocks batch write implementation
This commit is contained in:
parent
ec4ea9cf5d
commit
e159c4ead9
|
@ -348,6 +348,7 @@ typedef struct {
|
|||
rocksdb_options_t *options;
|
||||
rocksdb_writeoptions_t *writeoptions;
|
||||
rocksdb_readoptions_t *readoptions;
|
||||
rocksdb_writebatch_t *writebatch;
|
||||
TdThreadMutex rMutex;
|
||||
} SRocksCache;
|
||||
|
||||
|
@ -805,7 +806,7 @@ typedef struct {
|
|||
|
||||
int32_t tsdbOpenCache(STsdb *pTsdb);
|
||||
void tsdbCloseCache(STsdb *pTsdb);
|
||||
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t uid, TSDBROW *row);
|
||||
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);
|
||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb);
|
||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup);
|
||||
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
|
||||
|
|
|
@ -43,6 +43,8 @@ static void tsdbCloseBICache(STsdb *pTsdb) {
|
|||
}
|
||||
}
|
||||
|
||||
#define ROCKS_KEY_LEN 64
|
||||
|
||||
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
|
||||
SVnode *pVnode = pTsdb->pVnode;
|
||||
if (pVnode->pTfs) {
|
||||
|
@ -92,8 +94,11 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
|
|||
goto _err3;
|
||||
}
|
||||
|
||||
rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
|
||||
|
||||
taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL);
|
||||
|
||||
pTsdb->rCache.writebatch = writebatch;
|
||||
pTsdb->rCache.options = options;
|
||||
pTsdb->rCache.writeoptions = writeoptions;
|
||||
pTsdb->rCache.readoptions = readoptions;
|
||||
|
@ -113,37 +118,167 @@ _err:
|
|||
|
||||
static void tsdbCloseRocksCache(STsdb *pTsdb) {
|
||||
rocksdb_close(pTsdb->rCache.db);
|
||||
rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
|
||||
rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
|
||||
rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
|
||||
rocksdb_options_destroy(pTsdb->rCache.options);
|
||||
}
|
||||
|
||||
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t uid, TSDBROW *pRow) {
|
||||
SLastCol *tsdbCacheDeserialize(char const *value) {
|
||||
SLastCol *pLastCol = (SLastCol *)value;
|
||||
SColVal *pColVal = &pLastCol->colVal;
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||
pColVal->value.pData = (char *)value + sizeof(*pColVal);
|
||||
}
|
||||
|
||||
return pLastCol;
|
||||
}
|
||||
|
||||
void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
|
||||
SColVal *pColVal = &pLastCol->colVal;
|
||||
size_t length = sizeof(*pLastCol);
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||
length += pColVal->value.nData;
|
||||
}
|
||||
*value = taosMemoryMalloc(length);
|
||||
|
||||
*(SLastCol *)(*value) = *pLastCol;
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||
uint8_t *pVal = pColVal->value.pData;
|
||||
pColVal->value.pData = *value + sizeof(*pLastCol);
|
||||
if (pColVal->value.nData) {
|
||||
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
|
||||
}
|
||||
}
|
||||
*size = length;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
// 1, fetch schema
|
||||
STSchema *pTSchema = NULL;
|
||||
int32_t sver = TSDBROW_SVERSION(pRow);
|
||||
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// 2, iterate col values into array
|
||||
SArray *aColVal = taosArrayInit(32, sizeof(SColVal));
|
||||
|
||||
STSDBRowIter iter = {0};
|
||||
tsdbRowIterOpen(&iter, pRow, pTSchema);
|
||||
|
||||
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
|
||||
SColVal *pRColVal = tsdbCacheGetRColVal(pTsdb);
|
||||
if (pRColVal) {
|
||||
// merge pColVal with pRColVal
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||
uint8_t *pVal = pColVal->value.pData;
|
||||
|
||||
pColVal->value.pData = NULL;
|
||||
code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
|
||||
if (code) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if (pColVal->value.nData) {
|
||||
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
|
||||
}
|
||||
}
|
||||
|
||||
tsdbCachePutRColVal(pColVal);
|
||||
taosArrayPush(aColVal, pColVal);
|
||||
}
|
||||
|
||||
tsdbRowClose(&iter);
|
||||
|
||||
char *err = NULL;
|
||||
char buf[256] = {0};
|
||||
size_t vallen = 0;
|
||||
char *val = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, "key", 3, &vallen, &err);
|
||||
if (val) {
|
||||
} else {
|
||||
}
|
||||
rocksdb_put(pTsdb->rCache.db, pTsdb->rCache.writeoptions, "key", 3, "value", 5, &err);
|
||||
// 3, build keys & multi get from rocks
|
||||
int max_key_len = 0;
|
||||
int num_keys = TARRAY_SIZE(aColVal);
|
||||
char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
||||
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
|
||||
|
||||
char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN);
|
||||
int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pColVal->cid);
|
||||
if (last_key_len >= ROCKS_KEY_LEN) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
int lr_key_len =
|
||||
snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pColVal->cid);
|
||||
if (lr_key_len >= ROCKS_KEY_LEN) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
keys_list[i] = keys;
|
||||
keys_list[num_keys + i] = keys + ROCKS_KEY_LEN;
|
||||
keys_list_sizes[i] = last_key_len;
|
||||
keys_list_sizes[num_keys + i] = lr_key_len;
|
||||
}
|
||||
char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
||||
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
|
||||
char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
||||
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
|
||||
keys_list_sizes, values_list, values_list_sizes, errs);
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
taosMemoryFree(keys_list[i]);
|
||||
}
|
||||
taosMemoryFree(keys_list);
|
||||
taosMemoryFree(keys_list_sizes);
|
||||
taosMemoryFree(errs);
|
||||
|
||||
TSKEY keyTs = TSDBROW_TS(pRow);
|
||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
|
||||
if (COL_VAL_IS_VALUE(pColVal)) {
|
||||
SLastCol *pLastCol = NULL;
|
||||
if (NULL != values_list[i]) {
|
||||
pLastCol = tsdbCacheDeserialize(values_list[i]);
|
||||
}
|
||||
|
||||
if (NULL == pLastCol || pLastCol->ts <= keyTs) {
|
||||
char *value = NULL;
|
||||
size_t vlen = 0;
|
||||
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
|
||||
char key[ROCKS_KEY_LEN];
|
||||
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pColVal->cid);
|
||||
rocksdb_writebatch_put(wb, key, klen, value, vlen);
|
||||
taosMemoryFree(value);
|
||||
}
|
||||
}
|
||||
|
||||
if (!COL_VAL_IS_NONE(pColVal)) {
|
||||
SLastCol *pLastCol = NULL;
|
||||
if (NULL != values_list[i + num_keys]) {
|
||||
pLastCol = tsdbCacheDeserialize(values_list[i]);
|
||||
}
|
||||
|
||||
if (NULL == pLastCol || pLastCol->ts <= keyTs) {
|
||||
char *value = NULL;
|
||||
size_t vlen = 0;
|
||||
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
|
||||
char key[ROCKS_KEY_LEN];
|
||||
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pColVal->cid);
|
||||
rocksdb_writebatch_put(wb, key, klen, value, vlen);
|
||||
taosMemoryFree(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
taosMemoryFree(values_list);
|
||||
taosMemoryFree(values_list_sizes);
|
||||
|
||||
char *err = NULL;
|
||||
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
|
||||
if (NULL != err) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||
rocksdb_free(err);
|
||||
}
|
||||
rocksdb_writebatch_clear(wb);
|
||||
|
||||
_exit:
|
||||
taosArrayDestroy(aColVal);
|
||||
taosMemoryFree(pTSchema);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -678,7 +678,7 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
|||
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb);
|
||||
}
|
||||
*/
|
||||
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->uid, &lRow);
|
||||
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow);
|
||||
|
||||
// SMemTable
|
||||
pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
|
||||
|
@ -748,7 +748,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
|||
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb);
|
||||
}
|
||||
*/
|
||||
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->uid, &lRow);
|
||||
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow);
|
||||
|
||||
// SMemTable
|
||||
pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
|
||||
|
|
Loading…
Reference in New Issue