Merge pull request #25800 from taosdata/fix/TD-29995-3.0
fix: last cache update rule
This commit is contained in:
commit
b19298f8e0
|
@ -922,9 +922,16 @@ typedef struct {
|
||||||
SColVal colVal;
|
SColVal colVal;
|
||||||
} SLastCol;
|
} SLastCol;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t lflag;
|
||||||
|
STsdbRowKey tsdbRowKey;
|
||||||
|
SColVal colVal;
|
||||||
|
} SLastUpdateCtx;
|
||||||
|
|
||||||
int32_t tsdbOpenCache(STsdb *pTsdb);
|
int32_t tsdbOpenCache(STsdb *pTsdb);
|
||||||
void tsdbCloseCache(STsdb *pTsdb);
|
void tsdbCloseCache(STsdb *pTsdb);
|
||||||
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);
|
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow, SRow **aRow);
|
||||||
|
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData);
|
||||||
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||||
|
|
||||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb);
|
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb);
|
||||||
|
|
|
@ -998,46 +998,30 @@ static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) {
|
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) {
|
||||||
|
if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
// 1, fetch schema
|
int num_keys = TARRAY_SIZE(updCtxArray);
|
||||||
STSchema *pTSchema = NULL;
|
|
||||||
int32_t sver = TSDBROW_SVERSION(pRow);
|
|
||||||
|
|
||||||
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
terrno = code;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2, iterate col values into array
|
|
||||||
SArray *aColVal = taosArrayInit(32, sizeof(SColVal));
|
|
||||||
|
|
||||||
STSDBRowIter iter = {0};
|
|
||||||
tsdbRowIterOpen(&iter, pRow, pTSchema);
|
|
||||||
|
|
||||||
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
|
|
||||||
taosArrayPush(aColVal, pColVal);
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbRowClose(&iter);
|
|
||||||
|
|
||||||
// 3, build keys & multi get from rocks
|
|
||||||
int num_keys = TARRAY_SIZE(aColVal);
|
|
||||||
SArray *remainCols = NULL;
|
SArray *remainCols = NULL;
|
||||||
SLRUCache *pCache = pTsdb->lruCache;
|
SLRUCache *pCache = pTsdb->lruCache;
|
||||||
|
|
||||||
STsdbRowKey tsdbRowKey = {0};
|
|
||||||
tsdbRowGetKey(pRow, &tsdbRowKey);
|
|
||||||
SRowKey *pRowKey = &tsdbRowKey.key;
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||||
for (int i = 0; i < num_keys; ++i) {
|
for (int i = 0; i < num_keys; ++i) {
|
||||||
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
|
SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, i);
|
||||||
int16_t cid = pColVal->cid;
|
|
||||||
|
|
||||||
SLastKey *key = &(SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
|
int8_t lflag = updCtx->lflag;
|
||||||
|
SRowKey *pRowKey = &updCtx->tsdbRowKey.key;
|
||||||
|
SColVal *pColVal = &updCtx->colVal;
|
||||||
|
|
||||||
|
if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SLastKey *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
|
||||||
size_t klen = ROCKS_KEY_LEN;
|
size_t klen = ROCKS_KEY_LEN;
|
||||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
|
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
|
||||||
if (h) {
|
if (h) {
|
||||||
|
@ -1053,23 +1037,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
||||||
}
|
}
|
||||||
taosArrayPush(remainCols, &(SIdxKey){i, *key});
|
taosArrayPush(remainCols, &(SIdxKey){i, *key});
|
||||||
}
|
}
|
||||||
|
|
||||||
if (COL_VAL_IS_VALUE(pColVal)) {
|
|
||||||
key->lflag = LFLAG_LAST;
|
|
||||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
|
|
||||||
if (h) {
|
|
||||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
|
||||||
if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) {
|
|
||||||
tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal);
|
|
||||||
}
|
|
||||||
taosLRUCacheRelease(pCache, h, false);
|
|
||||||
} else {
|
|
||||||
if (!remainCols) {
|
|
||||||
remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
|
|
||||||
}
|
|
||||||
taosArrayPush(remainCols, &(SIdxKey){i, *key});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (remainCols) {
|
if (remainCols) {
|
||||||
|
@ -1097,12 +1064,17 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
||||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||||
for (int i = 0; i < num_keys; ++i) {
|
for (int i = 0; i < num_keys; ++i) {
|
||||||
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
|
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
|
||||||
SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx;
|
SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, i);
|
||||||
// SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx);
|
SRowKey *pRowKey = &updCtx->tsdbRowKey.key;
|
||||||
|
SColVal *pColVal = &updCtx->colVal;
|
||||||
|
|
||||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]);
|
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]);
|
||||||
SLastCol *PToFree = pLastCol;
|
SLastCol *PToFree = pLastCol;
|
||||||
|
|
||||||
|
if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (IS_LAST_ROW_KEY(idxKey->key)) {
|
if (IS_LAST_ROW_KEY(idxKey->key)) {
|
||||||
int32_t cmp_res = 1;
|
int32_t cmp_res = 1;
|
||||||
if (pLastCol) {
|
if (pLastCol) {
|
||||||
|
@ -1148,48 +1120,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
||||||
|
|
||||||
taosMemoryFree(value);
|
taosMemoryFree(value);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
if (COL_VAL_IS_VALUE(pColVal)) {
|
|
||||||
if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) {
|
|
||||||
char *value = NULL;
|
|
||||||
size_t vlen = 0;
|
|
||||||
SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal};
|
|
||||||
tsdbCacheSerialize(&lastColTmp, &value, &vlen);
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
|
||||||
|
|
||||||
rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
|
||||||
|
|
||||||
pLastCol = &lastColTmp;
|
|
||||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
|
||||||
*pTmpLastCol = *pLastCol;
|
|
||||||
pLastCol = pTmpLastCol;
|
|
||||||
|
|
||||||
size_t charge = sizeof(*pLastCol);
|
|
||||||
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
|
||||||
SValue *pValue = &pLastCol->rowKey.pks[i];
|
|
||||||
if (IS_VAR_DATA_TYPE(pValue->type)) {
|
|
||||||
reallocVarDataVal(pValue);
|
|
||||||
charge += pValue->nData;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
|
|
||||||
reallocVarData(&pLastCol->colVal);
|
|
||||||
charge += pLastCol->colVal.value.nData;
|
|
||||||
}
|
|
||||||
|
|
||||||
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge,
|
|
||||||
tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
|
|
||||||
if (status != TAOS_LRU_STATUS_OK) {
|
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFree(value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFreeClear(PToFree);
|
taosMemoryFreeClear(PToFree);
|
||||||
|
@ -1209,11 +1139,152 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
||||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
taosArrayDestroy(aColVal);
|
|
||||||
taosMemoryFree(pTSchema);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
|
||||||
|
SRow **aRow) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
// 1. prepare last
|
||||||
|
TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
|
||||||
|
|
||||||
|
STSchema *pTSchema = NULL;
|
||||||
|
int32_t sver = TSDBROW_SVERSION(&lRow);
|
||||||
|
SArray *ctxArray = NULL;
|
||||||
|
SSHashObj *iColHash = NULL;
|
||||||
|
|
||||||
|
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
|
||||||
|
int32_t nCol = pTSchema->numOfCols;
|
||||||
|
|
||||||
|
ctxArray = taosArrayInit(nCol, sizeof(SLastUpdateCtx));
|
||||||
|
iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
|
||||||
|
|
||||||
|
// 1. prepare by lrow
|
||||||
|
STsdbRowKey tsdbRowKey = {0};
|
||||||
|
tsdbRowGetKey(&lRow, &tsdbRowKey);
|
||||||
|
|
||||||
|
STSDBRowIter iter = {0};
|
||||||
|
tsdbRowIterOpen(&iter, &lRow, pTSchema);
|
||||||
|
int32_t iCol = 0;
|
||||||
|
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
|
||||||
|
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
|
||||||
|
taosArrayPush(ctxArray, &updateCtx);
|
||||||
|
|
||||||
|
if (!COL_VAL_IS_VALUE(pColVal)) {
|
||||||
|
tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
updateCtx.lflag = LFLAG_LAST;
|
||||||
|
taosArrayPush(ctxArray, &updateCtx);
|
||||||
|
}
|
||||||
|
tsdbRowClose(&iter);
|
||||||
|
|
||||||
|
// 2. prepare by the other rows
|
||||||
|
for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
|
||||||
|
if (tSimpleHashGetSize(iColHash) == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tRow.pTSRow = aRow[iRow];
|
||||||
|
|
||||||
|
STsdbRowKey tsdbRowKey = {0};
|
||||||
|
tsdbRowGetKey(&tRow, &tsdbRowKey);
|
||||||
|
|
||||||
|
void *pIte = NULL;
|
||||||
|
int32_t iter = 0;
|
||||||
|
while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
|
||||||
|
int32_t iCol = ((int32_t *)pIte)[0];
|
||||||
|
SColVal colVal = COL_VAL_NONE(0, 0);
|
||||||
|
tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal);
|
||||||
|
|
||||||
|
if (COL_VAL_IS_VALUE(&colVal)) {
|
||||||
|
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
|
||||||
|
taosArrayPush(ctxArray, &updateCtx);
|
||||||
|
tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. do update
|
||||||
|
tsdbCacheUpdate(pTsdb, suid, uid, ctxArray);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
taosMemoryFreeClear(pTSchema);
|
||||||
|
taosArrayDestroy(ctxArray);
|
||||||
|
tSimpleHashCleanup(iColHash);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
|
||||||
|
|
||||||
|
STSchema *pTSchema = NULL;
|
||||||
|
int32_t sver = TSDBROW_SVERSION(&lRow);
|
||||||
|
SArray *ctxArray = NULL;
|
||||||
|
|
||||||
|
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
|
||||||
|
|
||||||
|
// 1. prepare last
|
||||||
|
TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
|
||||||
|
|
||||||
|
for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
|
||||||
|
SColData *pColData = &pBlockData->aColData[iColData];
|
||||||
|
if ((pColData->flag & HAS_VALUE) != HAS_VALUE) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) {
|
||||||
|
STsdbRowKey tsdbRowKey = {0};
|
||||||
|
tsdbRowGetKey(&tRow, &tsdbRowKey);
|
||||||
|
|
||||||
|
uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow);
|
||||||
|
if (colType == 2) {
|
||||||
|
SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
|
||||||
|
tColDataGetValue(pColData, tRow.iRow, &colVal);
|
||||||
|
|
||||||
|
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
|
||||||
|
taosArrayPush(ctxArray, &updateCtx);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. prepare last row
|
||||||
|
STsdbRowKey tsdbRowKey = {0};
|
||||||
|
tsdbRowGetKey(&lRow, &tsdbRowKey);
|
||||||
|
|
||||||
|
STSDBRowIter iter = {0};
|
||||||
|
tsdbRowIterOpen(&iter, &lRow, pTSchema);
|
||||||
|
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
|
||||||
|
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
|
||||||
|
taosArrayPush(ctxArray, &updateCtx);
|
||||||
|
}
|
||||||
|
tsdbRowClose(&iter);
|
||||||
|
|
||||||
|
// 3. do update
|
||||||
|
tsdbCacheUpdate(pTsdb, suid, uid, ctxArray);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
taosMemoryFreeClear(pTSchema);
|
||||||
|
taosArrayDestroy(ctxArray);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
||||||
int nCols, int16_t *slotIds);
|
int nCols, int16_t *slotIds);
|
||||||
|
|
||||||
|
|
|
@ -629,14 +629,12 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
SMemSkipListNode *pos[SL_MAX_LEVEL];
|
SMemSkipListNode *pos[SL_MAX_LEVEL];
|
||||||
TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
|
TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
|
||||||
STsdbRowKey key;
|
STsdbRowKey key;
|
||||||
TSDBROW lRow; // last row
|
|
||||||
|
|
||||||
// first row
|
// first row
|
||||||
tsdbRowGetKey(&tRow, &key);
|
tsdbRowGetKey(&tRow, &key);
|
||||||
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
|
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
|
||||||
if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
|
if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
|
||||||
pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
|
pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
|
||||||
lRow = tRow;
|
|
||||||
|
|
||||||
// remain row
|
// remain row
|
||||||
++tRow.iRow;
|
++tRow.iRow;
|
||||||
|
@ -653,7 +651,6 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1))) goto _exit;
|
if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1))) goto _exit;
|
||||||
lRow = tRow;
|
|
||||||
|
|
||||||
++tRow.iRow;
|
++tRow.iRow;
|
||||||
}
|
}
|
||||||
|
@ -664,7 +661,7 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
|
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
|
||||||
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow);
|
tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData);
|
||||||
}
|
}
|
||||||
|
|
||||||
// SMemTable
|
// SMemTable
|
||||||
|
@ -688,7 +685,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
SMemSkipListNode *pos[SL_MAX_LEVEL];
|
SMemSkipListNode *pos[SL_MAX_LEVEL];
|
||||||
TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
|
TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
|
||||||
int32_t iRow = 0;
|
int32_t iRow = 0;
|
||||||
TSDBROW lRow;
|
|
||||||
|
|
||||||
// backward put first data
|
// backward put first data
|
||||||
tRow.pTSRow = aRow[iRow++];
|
tRow.pTSRow = aRow[iRow++];
|
||||||
|
@ -696,7 +692,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
|
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
|
||||||
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
|
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
lRow = tRow;
|
|
||||||
|
|
||||||
pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
|
pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
|
||||||
|
|
||||||
|
@ -717,8 +712,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1);
|
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
|
|
||||||
lRow = tRow;
|
|
||||||
|
|
||||||
iRow++;
|
iRow++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -727,7 +720,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
pTbData->maxKey = key.key.ts;
|
pTbData->maxKey = key.key.ts;
|
||||||
}
|
}
|
||||||
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
|
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
|
||||||
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow);
|
tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
// SMemTable
|
// SMemTable
|
||||||
|
|
|
@ -601,17 +601,21 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
|
||||||
STColumn *pTColumn = &pTSchema->columns[iCol];
|
STColumn *pTColumn = &pTSchema->columns[iCol];
|
||||||
SValue value;
|
SValue value;
|
||||||
|
|
||||||
ASSERT(iCol > 0);
|
|
||||||
|
|
||||||
if (pRow->type == TSDBROW_ROW_FMT) {
|
if (pRow->type == TSDBROW_ROW_FMT) {
|
||||||
tRowGet(pRow->pTSRow, pTSchema, iCol, pColVal);
|
tRowGet(pRow->pTSRow, pTSchema, iCol, pColVal);
|
||||||
} else if (pRow->type == TSDBROW_COL_FMT) {
|
} else if (pRow->type == TSDBROW_COL_FMT) {
|
||||||
SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId);
|
if (iCol == 0) {
|
||||||
|
*pColVal =
|
||||||
if (pColData) {
|
COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID,
|
||||||
tColDataGetValue(pColData, pRow->iRow, pColVal);
|
((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pRow->pBlockData->aTSKEY[pRow->iRow]}));
|
||||||
} else {
|
} else {
|
||||||
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
|
SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId);
|
||||||
|
|
||||||
|
if (pColData) {
|
||||||
|
tColDataGetValue(pColData, pRow->iRow, pColVal);
|
||||||
|
} else {
|
||||||
|
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -1817,4 +1821,4 @@ uint32_t tsdbCvtTimestampAlg(uint32_t alg) {
|
||||||
DEFINE_VAR(alg)
|
DEFINE_VAR(alg)
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue