fix: last cache update rule
This commit is contained in:
parent
92a3686c2f
commit
fc047ddafc
|
@ -133,6 +133,7 @@ int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc);
|
|||
int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter);
|
||||
void tRowIterClose(SRowIter **ppIter);
|
||||
SColVal *tRowIterNext(SRowIter *pIter);
|
||||
SColVal *tRowIterMoveTo(SRowIter *pIter, int32_t iTColum);
|
||||
|
||||
// STag ================================
|
||||
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
|
||||
|
|
|
@ -774,7 +774,7 @@ void tRowIterClose(SRowIter **ppIter) {
|
|||
*ppIter = NULL;
|
||||
}
|
||||
|
||||
SColVal *tRowIterNext(SRowIter *pIter) {
|
||||
static SColVal *tRowIterGetValue(SRowIter *pIter) {
|
||||
if (pIter->iTColumn >= pIter->pTSchema->numOfCols) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -896,10 +896,25 @@ SColVal *tRowIterNext(SRowIter *pIter) {
|
|||
}
|
||||
|
||||
_exit:
|
||||
pIter->iTColumn++;
|
||||
return &pIter->cv;
|
||||
}
|
||||
|
||||
SColVal* tRowIterNext(SRowIter *pIter) {
|
||||
SColVal* pColVal = tRowIterGetValue(pIter);
|
||||
if (pColVal) {
|
||||
pIter->iTColumn++;
|
||||
}
|
||||
return pColVal;
|
||||
}
|
||||
|
||||
SColVal *tRowIterMoveTo(SRowIter *pIter, int32_t iTColum) {
|
||||
pIter->iTColumn = iTColum;
|
||||
if (pIter->iTColumn < 0 || pIter->iTColumn >= pIter->pTSchema->numOfCols) {
|
||||
return NULL;
|
||||
}
|
||||
return tRowIterGetValue(pIter);
|
||||
}
|
||||
|
||||
static int32_t tRowNoneUpsertColData(SColData *aColData, int32_t nColData, int32_t flag) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
|
|
@ -132,6 +132,7 @@ void tColRowGetKey(SBlockData *pBlock, int32_t irow, SRowKey *key);
|
|||
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
||||
void tsdbRowClose(STSDBRowIter *pIter);
|
||||
SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
|
||||
SColVal *tsdbRowIterMoveTo(STSDBRowIter *pIter, int32_t iCol);
|
||||
|
||||
// SRowMerger
|
||||
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pSchema);
|
||||
|
@ -902,9 +903,16 @@ typedef struct {
|
|||
SColVal colVal;
|
||||
} SLastCol;
|
||||
|
||||
typedef struct {
|
||||
int8_t lflag;
|
||||
STsdbRowKey tsdbRowKey;
|
||||
SColVal colVal;
|
||||
} SLastUpdateCtx;
|
||||
|
||||
int32_t tsdbOpenCache(STsdb *pTsdb);
|
||||
void tsdbCloseCache(STsdb *pTsdb);
|
||||
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);
|
||||
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t nRow, SRow **aRow, TSDBROW *pLRow);
|
||||
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData, TSDBROW *pLRow);
|
||||
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||
|
||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb);
|
||||
|
|
|
@ -992,46 +992,30 @@ static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal
|
|||
}
|
||||
}
|
||||
|
||||
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) {
|
||||
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) {
|
||||
if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
// 1, fetch schema
|
||||
STSchema *pTSchema = NULL;
|
||||
int32_t sver = TSDBROW_SVERSION(pRow);
|
||||
|
||||
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// 2, iterate col values into array
|
||||
SArray *aColVal = taosArrayInit(32, sizeof(SColVal));
|
||||
|
||||
STSDBRowIter iter = {0};
|
||||
tsdbRowIterOpen(&iter, pRow, pTSchema);
|
||||
|
||||
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
|
||||
taosArrayPush(aColVal, pColVal);
|
||||
}
|
||||
|
||||
tsdbRowClose(&iter);
|
||||
|
||||
// 3, build keys & multi get from rocks
|
||||
int num_keys = TARRAY_SIZE(aColVal);
|
||||
int num_keys = TARRAY_SIZE(updCtxArray);
|
||||
SArray *remainCols = NULL;
|
||||
SLRUCache *pCache = pTsdb->lruCache;
|
||||
|
||||
STsdbRowKey tsdbRowKey = {0};
|
||||
tsdbRowGetKey(pRow, &tsdbRowKey);
|
||||
SRowKey *pRowKey = &tsdbRowKey.key;
|
||||
|
||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
|
||||
int16_t cid = pColVal->cid;
|
||||
SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, i);
|
||||
|
||||
SLastKey *key = &(SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
|
||||
int8_t lflag = updCtx->lflag;
|
||||
SRowKey *pRowKey = &updCtx->tsdbRowKey.key;
|
||||
SColVal *pColVal = &updCtx->colVal;
|
||||
|
||||
if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SLastKey *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
|
||||
size_t klen = ROCKS_KEY_LEN;
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
|
||||
if (h) {
|
||||
|
@ -1047,23 +1031,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
|||
}
|
||||
taosArrayPush(remainCols, &(SIdxKey){i, *key});
|
||||
}
|
||||
|
||||
if (COL_VAL_IS_VALUE(pColVal)) {
|
||||
key->lflag = LFLAG_LAST;
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
|
||||
if (h) {
|
||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
||||
if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) {
|
||||
tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal);
|
||||
}
|
||||
taosLRUCacheRelease(pCache, h, false);
|
||||
} else {
|
||||
if (!remainCols) {
|
||||
remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
|
||||
}
|
||||
taosArrayPush(remainCols, &(SIdxKey){i, *key});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (remainCols) {
|
||||
|
@ -1091,12 +1058,17 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
|||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
|
||||
SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx;
|
||||
// SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx);
|
||||
SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, i);
|
||||
SRowKey *pRowKey = &updCtx->tsdbRowKey.key;
|
||||
SColVal *pColVal = &updCtx->colVal;
|
||||
|
||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]);
|
||||
SLastCol *PToFree = pLastCol;
|
||||
|
||||
if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (IS_LAST_ROW_KEY(idxKey->key)) {
|
||||
int32_t cmp_res = 1;
|
||||
if (pLastCol) {
|
||||
|
@ -1142,48 +1114,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
|||
|
||||
taosMemoryFree(value);
|
||||
}
|
||||
} else {
|
||||
if (COL_VAL_IS_VALUE(pColVal)) {
|
||||
if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) {
|
||||
char *value = NULL;
|
||||
size_t vlen = 0;
|
||||
SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal};
|
||||
tsdbCacheSerialize(&lastColTmp, &value, &vlen);
|
||||
|
||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||
|
||||
rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
||||
|
||||
pLastCol = &lastColTmp;
|
||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
*pTmpLastCol = *pLastCol;
|
||||
pLastCol = pTmpLastCol;
|
||||
|
||||
size_t charge = sizeof(*pLastCol);
|
||||
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
||||
SValue *pValue = &pLastCol->rowKey.pks[i];
|
||||
if (IS_VAR_DATA_TYPE(pValue->type)) {
|
||||
reallocVarDataVal(pValue);
|
||||
charge += pValue->nData;
|
||||
}
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
|
||||
reallocVarData(&pLastCol->colVal);
|
||||
charge += pLastCol->colVal.value.nData;
|
||||
}
|
||||
|
||||
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge,
|
||||
tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
|
||||
if (status != TAOS_LRU_STATUS_OK) {
|
||||
code = -1;
|
||||
}
|
||||
|
||||
taosMemoryFree(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(PToFree);
|
||||
|
@ -1203,11 +1133,353 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
|||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
|
||||
_exit:
|
||||
taosArrayDestroy(aColVal);
|
||||
taosMemoryFree(pTSchema);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t nRow, SRow **aRow, TSDBROW *pLRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
// 1. prepare last
|
||||
STSchema *pTSchema = NULL;
|
||||
int32_t version = TSDBROW_VERSION(pLRow);
|
||||
int32_t sver = TSDBROW_SVERSION(pLRow);
|
||||
SArray *ctxArray = NULL;
|
||||
SSHashObj *iColHash = NULL;
|
||||
|
||||
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
|
||||
int32_t nCol = pTSchema->numOfCols;
|
||||
|
||||
ctxArray = taosArrayInit(nCol, sizeof(SLastUpdateCtx));
|
||||
iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
|
||||
for (int32_t iCol = 0; iCol < nCol; ++iCol) {
|
||||
tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0);
|
||||
}
|
||||
|
||||
for (int32_t iRow = nRow - 1; iRow >= 0; --iRow) {
|
||||
if (tSimpleHashGetSize(iColHash) == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
tRow.pTSRow = aRow[iRow];
|
||||
|
||||
STsdbRowKey tsdbRowKey = {0};
|
||||
tsdbRowGetKey(&tRow, &tsdbRowKey);
|
||||
|
||||
void *pIte = NULL;
|
||||
int32_t iter = 0;
|
||||
while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
|
||||
int32_t iCol = ((int32_t *)pIte)[0];
|
||||
SColVal colVal = COL_VAL_NONE(0, 0);
|
||||
tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal);
|
||||
|
||||
if (COL_VAL_IS_VALUE(&colVal)) {
|
||||
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, tsdbRowKey = tsdbRowKey, .colVal = colVal};
|
||||
taosArrayPush(ctxArray, &updateCtx);
|
||||
tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 2. prepare last row
|
||||
STsdbRowKey tsdbRowKey = {0};
|
||||
tsdbRowGetKey(pLRow, &tsdbRowKey);
|
||||
|
||||
STSDBRowIter iter = {0};
|
||||
tsdbRowIterOpen(&iter, pLRow, pTSchema);
|
||||
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
|
||||
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
|
||||
taosArrayPush(ctxArray, &updateCtx);
|
||||
}
|
||||
tsdbRowClose(&iter);
|
||||
|
||||
// 3. do update
|
||||
tsdbCacheUpdate(pTsdb, suid, uid, ctxArray);
|
||||
|
||||
_exit:
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
taosArrayDestroy(ctxArray);
|
||||
tSimpleHashCleanup(iColHash);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData, TSDBROW *pLRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
STSchema *pTSchema = NULL;
|
||||
int32_t sver = TSDBROW_SVERSION(pLRow);
|
||||
SArray *ctxArray = NULL;
|
||||
|
||||
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
|
||||
|
||||
// 1. prepare last
|
||||
TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
|
||||
|
||||
for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
|
||||
for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) {
|
||||
SColData *pColData = &pBlockData->aColData[iColData];
|
||||
if ((pColData->flag & HAS_VALUE) == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
|
||||
tColDataGetValue(pColData, tRow.iRow, &colVal);
|
||||
if (COL_VAL_IS_VALUE(&colVal)) {
|
||||
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .colVal = colVal};
|
||||
taosArrayPush(ctxArray, &updateCtx);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 2. prepare last row
|
||||
STsdbRowKey tsdbRowKey = {0};
|
||||
tsdbRowGetKey(pLRow, &tsdbRowKey);
|
||||
|
||||
STSDBRowIter iter = {0};
|
||||
tsdbRowIterOpen(&iter, pLRow, pTSchema);
|
||||
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
|
||||
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
|
||||
taosArrayPush(ctxArray, &updateCtx);
|
||||
}
|
||||
tsdbRowClose(&iter);
|
||||
|
||||
// 3. do update
|
||||
tsdbCacheUpdate(pTsdb, suid, uid, ctxArray);
|
||||
|
||||
_exit:
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
taosArrayDestroy(ctxArray);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) {
|
||||
// int32_t code = 0;
|
||||
|
||||
// // 1, fetch schema
|
||||
// STSchema *pTSchema = NULL;
|
||||
// int32_t sver = TSDBROW_SVERSION(pRow);
|
||||
|
||||
// code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// terrno = code;
|
||||
// return -1;
|
||||
// }
|
||||
|
||||
// // 2, iterate col values into array
|
||||
// SArray *aColVal = taosArrayInit(32, sizeof(SColVal));
|
||||
|
||||
// STSDBRowIter iter = {0};
|
||||
// tsdbRowIterOpen(&iter, pRow, pTSchema);
|
||||
|
||||
// for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
|
||||
// taosArrayPush(aColVal, pColVal);
|
||||
// }
|
||||
|
||||
// tsdbRowClose(&iter);
|
||||
|
||||
// // 3, build keys & multi get from rocks
|
||||
// int num_keys = TARRAY_SIZE(aColVal);
|
||||
// SArray *remainCols = NULL;
|
||||
// SLRUCache *pCache = pTsdb->lruCache;
|
||||
|
||||
// STsdbRowKey tsdbRowKey = {0};
|
||||
// tsdbRowGetKey(pRow, &tsdbRowKey);
|
||||
// SRowKey *pRowKey = &tsdbRowKey.key;
|
||||
|
||||
// taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
// for (int i = 0; i < num_keys; ++i) {
|
||||
// SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
|
||||
// int16_t cid = pColVal->cid;
|
||||
|
||||
// SLastKey *key = &(SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
|
||||
// size_t klen = ROCKS_KEY_LEN;
|
||||
// LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
|
||||
// if (h) {
|
||||
// SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
||||
// int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
|
||||
// if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
|
||||
// tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal);
|
||||
// }
|
||||
// taosLRUCacheRelease(pCache, h, false);
|
||||
// } else {
|
||||
// if (!remainCols) {
|
||||
// remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
|
||||
// }
|
||||
// taosArrayPush(remainCols, &(SIdxKey){i, *key});
|
||||
// }
|
||||
|
||||
// if (COL_VAL_IS_VALUE(pColVal)) {
|
||||
// key->lflag = LFLAG_LAST;
|
||||
// LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
|
||||
// if (h) {
|
||||
// SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
||||
// if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) {
|
||||
// tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal);
|
||||
// }
|
||||
// taosLRUCacheRelease(pCache, h, false);
|
||||
// } else {
|
||||
// if (!remainCols) {
|
||||
// remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
|
||||
// }
|
||||
// taosArrayPush(remainCols, &(SIdxKey){i, *key});
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// if (remainCols) {
|
||||
// num_keys = TARRAY_SIZE(remainCols);
|
||||
// }
|
||||
// if (remainCols && num_keys > 0) {
|
||||
// char **keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
|
||||
// size_t *keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
|
||||
// for (int i = 0; i < num_keys; ++i) {
|
||||
// SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
|
||||
|
||||
// keys_list[i] = (char *)&idxKey->key;
|
||||
// keys_list_sizes[i] = ROCKS_KEY_LEN;
|
||||
// }
|
||||
// char **values_list = taosMemoryCalloc(num_keys, sizeof(char *));
|
||||
// size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
|
||||
// char **errs = taosMemoryCalloc(num_keys, sizeof(char *));
|
||||
// rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list,
|
||||
// keys_list_sizes, values_list, values_list_sizes, errs);
|
||||
// for (int i = 0; i < num_keys; ++i) {
|
||||
// rocksdb_free(errs[i]);
|
||||
// }
|
||||
// taosMemoryFree(errs);
|
||||
|
||||
// rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||
// for (int i = 0; i < num_keys; ++i) {
|
||||
// SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
|
||||
// SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx;
|
||||
// // SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx);
|
||||
|
||||
// SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]);
|
||||
// SLastCol *PToFree = pLastCol;
|
||||
|
||||
// if (IS_LAST_ROW_KEY(idxKey->key)) {
|
||||
// int32_t cmp_res = 1;
|
||||
// if (pLastCol) {
|
||||
// cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
|
||||
// }
|
||||
|
||||
// if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
|
||||
// char *value = NULL;
|
||||
// size_t vlen = 0;
|
||||
// SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal};
|
||||
// tsdbCacheSerialize(&lastColTmp, &value, &vlen);
|
||||
|
||||
// taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||
|
||||
// rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
|
||||
|
||||
// taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
||||
|
||||
// pLastCol = &lastColTmp;
|
||||
// SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
// *pTmpLastCol = *pLastCol;
|
||||
// pLastCol = pTmpLastCol;
|
||||
|
||||
// size_t charge = sizeof(*pLastCol);
|
||||
// for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
||||
// SValue *pValue = &pLastCol->rowKey.pks[i];
|
||||
// if (IS_VAR_DATA_TYPE(pValue->type)) {
|
||||
// reallocVarDataVal(pValue);
|
||||
// charge += pValue->nData;
|
||||
// }
|
||||
// }
|
||||
|
||||
// if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
|
||||
// reallocVarData(&pLastCol->colVal);
|
||||
// charge += pLastCol->colVal.value.nData;
|
||||
// }
|
||||
|
||||
// LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge,
|
||||
// tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
|
||||
// if (status != TAOS_LRU_STATUS_OK) {
|
||||
// code = -1;
|
||||
// }
|
||||
|
||||
// taosMemoryFree(value);
|
||||
// }
|
||||
// } else {
|
||||
// if (COL_VAL_IS_VALUE(pColVal)) {
|
||||
// if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) {
|
||||
// char *value = NULL;
|
||||
// size_t vlen = 0;
|
||||
// SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal};
|
||||
// tsdbCacheSerialize(&lastColTmp, &value, &vlen);
|
||||
|
||||
// taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||
|
||||
// rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
|
||||
|
||||
// taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
||||
|
||||
// pLastCol = &lastColTmp;
|
||||
// SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
// *pTmpLastCol = *pLastCol;
|
||||
// pLastCol = pTmpLastCol;
|
||||
|
||||
// size_t charge = sizeof(*pLastCol);
|
||||
// for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
||||
// SValue *pValue = &pLastCol->rowKey.pks[i];
|
||||
// if (IS_VAR_DATA_TYPE(pValue->type)) {
|
||||
// reallocVarDataVal(pValue);
|
||||
// charge += pValue->nData;
|
||||
// }
|
||||
// }
|
||||
|
||||
// if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
|
||||
// reallocVarData(&pLastCol->colVal);
|
||||
// charge += pLastCol->colVal.value.nData;
|
||||
// }
|
||||
|
||||
// LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge,
|
||||
// tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
|
||||
// if (status != TAOS_LRU_STATUS_OK) {
|
||||
// code = -1;
|
||||
// }
|
||||
|
||||
// taosMemoryFree(value);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// taosMemoryFreeClear(PToFree);
|
||||
// rocksdb_free(values_list[i]);
|
||||
// }
|
||||
|
||||
// rocksMayWrite(pTsdb, true, false, true);
|
||||
|
||||
// taosMemoryFree(keys_list);
|
||||
// taosMemoryFree(keys_list_sizes);
|
||||
// taosMemoryFree(values_list);
|
||||
// taosMemoryFree(values_list_sizes);
|
||||
|
||||
// taosArrayDestroy(remainCols);
|
||||
// }
|
||||
|
||||
// taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
|
||||
// _exit:
|
||||
// taosArrayDestroy(aColVal);
|
||||
// taosMemoryFree(pTSchema);
|
||||
// return code;
|
||||
// }
|
||||
|
||||
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
||||
int nCols, int16_t *slotIds);
|
||||
|
||||
|
|
|
@ -664,7 +664,7 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
|||
}
|
||||
|
||||
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
|
||||
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow);
|
||||
tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData, &lRow);
|
||||
}
|
||||
|
||||
// SMemTable
|
||||
|
@ -727,7 +727,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
|||
pTbData->maxKey = key.key.ts;
|
||||
}
|
||||
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
|
||||
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow);
|
||||
tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, nRow, aRow, &lRow);
|
||||
}
|
||||
|
||||
// SMemTable
|
||||
|
|
|
@ -601,17 +601,21 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
|
|||
STColumn *pTColumn = &pTSchema->columns[iCol];
|
||||
SValue value;
|
||||
|
||||
ASSERT(iCol > 0);
|
||||
|
||||
if (pRow->type == TSDBROW_ROW_FMT) {
|
||||
tRowGet(pRow->pTSRow, pTSchema, iCol, pColVal);
|
||||
} else if (pRow->type == TSDBROW_COL_FMT) {
|
||||
SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId);
|
||||
|
||||
if (pColData) {
|
||||
tColDataGetValue(pColData, pRow->iRow, pColVal);
|
||||
if (iCol == 0) {
|
||||
*pColVal =
|
||||
COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID,
|
||||
((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pRow->pBlockData->aTSKEY[pRow->iRow]}));
|
||||
} else {
|
||||
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
|
||||
SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId);
|
||||
|
||||
if (pColData) {
|
||||
tColDataGetValue(pColData, pRow->iRow, pColVal);
|
||||
} else {
|
||||
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ASSERT(0);
|
||||
|
@ -701,25 +705,51 @@ void tsdbRowClose(STSDBRowIter *pIter) {
|
|||
}
|
||||
}
|
||||
|
||||
static SColVal *tsdbRowColIterGetValue(STSDBRowIter *pIter) {
|
||||
if (pIter->iColData == 0) {
|
||||
pIter->cv = COL_VAL_VALUE(
|
||||
PRIMARYKEY_TIMESTAMP_COL_ID,
|
||||
((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pIter->pRow->pBlockData->aTSKEY[pIter->pRow->iRow]}));
|
||||
return &pIter->cv;
|
||||
}
|
||||
|
||||
if (pIter->iColData <= pIter->pRow->pBlockData->nColData) {
|
||||
tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv);
|
||||
return &pIter->cv;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static SColVal *tsdbRowColIterNext(STSDBRowIter *pIter) {
|
||||
SColVal* pColVal = tsdbRowColIterGetValue(pIter);
|
||||
if (pColVal) {
|
||||
++pIter->iColData;
|
||||
}
|
||||
return pColVal;
|
||||
}
|
||||
|
||||
static SColVal* tsdbRowColIterMoveTo(STSDBRowIter *pIter, int32_t iCol) {
|
||||
pIter->iColData = iCol;
|
||||
return tsdbRowColIterGetValue(pIter);
|
||||
}
|
||||
|
||||
SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
|
||||
if (pIter->pRow->type == TSDBROW_ROW_FMT) {
|
||||
return tRowIterNext(pIter->pIter);
|
||||
} else if (pIter->pRow->type == TSDBROW_COL_FMT) {
|
||||
if (pIter->iColData == 0) {
|
||||
pIter->cv = COL_VAL_VALUE(
|
||||
PRIMARYKEY_TIMESTAMP_COL_ID,
|
||||
((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pIter->pRow->pBlockData->aTSKEY[pIter->pRow->iRow]}));
|
||||
++pIter->iColData;
|
||||
return &pIter->cv;
|
||||
}
|
||||
return tsdbRowColIterNext(pIter);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
return NULL; // suppress error report by compiler
|
||||
}
|
||||
}
|
||||
|
||||
if (pIter->iColData <= pIter->pRow->pBlockData->nColData) {
|
||||
tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv);
|
||||
++pIter->iColData;
|
||||
return &pIter->cv;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
SColVal *tsdbRowIterMoveTo(STSDBRowIter *pIter, int32_t iCol) {
|
||||
if (pIter->pRow->type == TSDBROW_ROW_FMT) {
|
||||
return tRowIterMoveTo(pIter->pIter, iCol);
|
||||
} else if (pIter->pRow->type == TSDBROW_COL_FMT) {
|
||||
return tsdbRowColIterMoveTo(pIter, iCol);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
return NULL; // suppress error report by compiler
|
||||
|
@ -1820,4 +1850,4 @@ uint32_t tsdbCvtTimestampAlg(uint32_t alg) {
|
|||
DEFINE_VAR(alg)
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue