cache/stt: remove ts from merge tree loading
This commit is contained in:
parent
756fefaa29
commit
86e3c1d20d
|
@ -716,9 +716,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
||||||
SLRUCache *pCache = pTsdb->lruCache;
|
SLRUCache *pCache = pTsdb->lruCache;
|
||||||
for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
|
for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
|
||||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
||||||
SIdxKey *idxKey = taosArrayGet(remainCols, j);
|
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
|
||||||
int16_t cid = idxKey->key.cid;
|
|
||||||
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
|
|
||||||
if (pLastCol) {
|
if (pLastCol) {
|
||||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||||
*pTmpLastCol = *pLastCol;
|
*pTmpLastCol = *pLastCol;
|
||||||
|
@ -747,6 +745,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
||||||
++j;
|
++j;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(values_list);
|
taosMemoryFree(values_list);
|
||||||
taosMemoryFree(values_list_sizes);
|
taosMemoryFree(values_list_sizes);
|
||||||
|
|
||||||
|
@ -759,14 +758,13 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
||||||
|
|
||||||
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
|
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
SArray *remainCols = NULL;
|
||||||
SLRUCache *pCache = pTsdb->lruCache;
|
SLRUCache *pCache = pTsdb->lruCache;
|
||||||
SArray *pCidList = pr->pCidList;
|
SArray *pCidList = pr->pCidList;
|
||||||
int num_keys = TARRAY_SIZE(pCidList);
|
int num_keys = TARRAY_SIZE(pCidList);
|
||||||
|
|
||||||
SArray *remainCols = NULL;
|
|
||||||
|
|
||||||
for (int i = 0; i < num_keys; ++i) {
|
for (int i = 0; i < num_keys; ++i) {
|
||||||
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
|
int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
|
||||||
|
|
||||||
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
|
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
|
||||||
|
|
||||||
|
@ -775,7 +773,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
||||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
||||||
|
|
||||||
SLastCol lastCol = *pLastCol;
|
SLastCol lastCol = *pLastCol;
|
||||||
reallocVarData(&lastCol.colVal);
|
// reallocVarData(&lastCol.colVal);
|
||||||
taosArrayPush(pLastArray, &lastCol);
|
taosArrayPush(pLastArray, &lastCol);
|
||||||
|
|
||||||
if (h) {
|
if (h) {
|
||||||
|
@ -796,7 +794,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
||||||
if (remainCols && TARRAY_SIZE(remainCols) > 0) {
|
if (remainCols && TARRAY_SIZE(remainCols) > 0) {
|
||||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||||
for (int i = 0; i < TARRAY_SIZE(remainCols);) {
|
for (int i = 0; i < TARRAY_SIZE(remainCols);) {
|
||||||
SIdxKey *idxKey = taosArrayGet(remainCols, i);
|
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
|
||||||
LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
|
LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
|
||||||
if (h) {
|
if (h) {
|
||||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
||||||
|
@ -1551,9 +1549,15 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int nTmpCols = nCols;
|
||||||
|
bool hasTs = false;
|
||||||
|
if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
|
--nTmpCols;
|
||||||
|
hasTs = true;
|
||||||
|
}
|
||||||
for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) {
|
for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) {
|
||||||
state->pLoadInfo[i].colIds = aCols;
|
state->pLoadInfo[i].colIds = hasTs ? aCols + 1 : aCols;
|
||||||
state->pLoadInfo[i].numOfCols = nCols;
|
state->pLoadInfo[i].numOfCols = nTmpCols;
|
||||||
state->pLoadInfo[i].isLast = isLast;
|
state->pLoadInfo[i].isLast = isLast;
|
||||||
}
|
}
|
||||||
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
|
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
|
||||||
|
|
|
@ -306,12 +306,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
tsdbCacheGetBatch(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
|
tsdbCacheGetBatch(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
|
||||||
// tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
|
// tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
|
||||||
if (TARRAY_SIZE(pRow) <= 0) {
|
if (TARRAY_SIZE(pRow) <= 0) {
|
||||||
taosArrayClearEx(pRow, freeItem);
|
// taosArrayClearEx(pRow, freeItem);
|
||||||
|
taosArrayClear(pRow);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
SLastCol* pColVal = taosArrayGet(pRow, 0);
|
SLastCol* pColVal = taosArrayGet(pRow, 0);
|
||||||
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
|
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
|
||||||
taosArrayClearEx(pRow, freeItem);
|
// taosArrayClearEx(pRow, freeItem);
|
||||||
|
taosArrayClear(pRow);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -368,7 +370,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
|
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayClearEx(pRow, freeItem);
|
// taosArrayClearEx(pRow, freeItem);
|
||||||
|
taosArrayClear(pRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasRes) {
|
if (hasRes) {
|
||||||
|
@ -380,17 +383,20 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
|
|
||||||
tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
|
tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
|
||||||
if (TARRAY_SIZE(pRow) <= 0) {
|
if (TARRAY_SIZE(pRow) <= 0) {
|
||||||
taosArrayClearEx(pRow, freeItem);
|
// taosArrayClearEx(pRow, freeItem);
|
||||||
|
taosArrayClear(pRow);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
|
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
|
||||||
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
|
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
|
||||||
taosArrayClearEx(pRow, freeItem);
|
// taosArrayClearEx(pRow, freeItem);
|
||||||
|
taosArrayClear(pRow);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
|
saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
|
||||||
taosArrayClearEx(pRow, freeItem);
|
// taosArrayClearEx(pRow, freeItem);
|
||||||
|
taosArrayClear(pRow);
|
||||||
|
|
||||||
taosArrayPush(pTableUidList, &uid);
|
taosArrayPush(pTableUidList, &uid);
|
||||||
|
|
||||||
|
@ -418,7 +424,8 @@ _end:
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pRes);
|
taosMemoryFree(pRes);
|
||||||
taosArrayDestroyEx(pRow, freeItem);
|
// taosArrayDestroyEx(pRow, freeItem);
|
||||||
|
taosArrayDestroy(pRow);
|
||||||
taosArrayDestroyEx(pLastCols, freeItem);
|
taosArrayDestroyEx(pLastCols, freeItem);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue