cache/last_row cid: first round implementation for last_row with cid
This commit is contained in:
parent
5e53782b20
commit
9b275f7421
|
@ -324,6 +324,9 @@ _exit:
|
||||||
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
||||||
int nCols, int16_t *slotIds);
|
int nCols, int16_t *slotIds);
|
||||||
|
|
||||||
|
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
||||||
|
int nCols, int16_t *slotIds);
|
||||||
|
|
||||||
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype) {
|
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype) {
|
||||||
static char const *alstring[2] = {"last_row", "last"};
|
static char const *alstring[2] = {"last_row", "last"};
|
||||||
char const *lstring = alstring[ltype];
|
char const *lstring = alstring[ltype];
|
||||||
|
@ -374,17 +377,18 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// recalc: load from tsdb
|
// recalc: load from tsdb
|
||||||
if (ltype) {
|
int16_t aCols[1] = {cid};
|
||||||
int16_t aCols[1] = {cid};
|
int16_t slotIds[1] = {pr->pSlotIds[i]};
|
||||||
int16_t slotIds[1] = {pr->pSlotIds[i]};
|
pTmpColArray = NULL;
|
||||||
pTmpColArray = NULL;
|
|
||||||
|
|
||||||
|
if (ltype) {
|
||||||
mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds);
|
mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds);
|
||||||
if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) {
|
|
||||||
pLastCol = taosArrayGet(pTmpColArray, 0);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// mergeLastRowCid(uid, pTsdb, &pArray, pr, cid);
|
mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) {
|
||||||
|
pLastCol = taosArrayGet(pTmpColArray, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// still null, then make up a none col value
|
// still null, then make up a none col value
|
||||||
|
@ -392,7 +396,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
|
||||||
pLastCol = &noneCol;
|
pLastCol = &noneCol;
|
||||||
}
|
}
|
||||||
|
|
||||||
// maybe store it back to rocks cache
|
// store result back to rocks cache
|
||||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||||
char *value = NULL;
|
char *value = NULL;
|
||||||
size_t vlen = 0;
|
size_t vlen = 0;
|
||||||
|
@ -2410,6 +2414,174 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
||||||
|
int nCols, int16_t *slotIds) {
|
||||||
|
STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
|
||||||
|
int16_t nLastCol = nCols;
|
||||||
|
int16_t noneCol = 0;
|
||||||
|
bool setNoneCol = false;
|
||||||
|
bool hasRow = false;
|
||||||
|
bool ignoreEarlierTs = false;
|
||||||
|
SArray *pColArray = NULL;
|
||||||
|
SColVal *pColVal = &(SColVal){0};
|
||||||
|
|
||||||
|
int32_t code = initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
|
||||||
|
if (NULL == aColArray) {
|
||||||
|
taosArrayDestroy(pColArray);
|
||||||
|
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < nCols; ++i) {
|
||||||
|
taosArrayPush(aColArray, &aCols[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
TSKEY lastRowTs = TSKEY_MAX;
|
||||||
|
|
||||||
|
CacheNextRowIter iter = {0};
|
||||||
|
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
|
||||||
|
&pr->pDataFReaderLast, pr->lastTs);
|
||||||
|
|
||||||
|
do {
|
||||||
|
TSDBROW *pRow = NULL;
|
||||||
|
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
|
||||||
|
|
||||||
|
if (!pRow) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
hasRow = true;
|
||||||
|
|
||||||
|
int32_t sversion = TSDBROW_SVERSION(pRow);
|
||||||
|
if (sversion != -1) {
|
||||||
|
code = updateTSchema(sversion, pr, uid);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pTSchema = pr->pCurrSchema;
|
||||||
|
}
|
||||||
|
// int16_t nCol = pTSchema->numOfCols;
|
||||||
|
|
||||||
|
TSKEY rowTs = TSDBROW_TS(pRow);
|
||||||
|
|
||||||
|
if (lastRowTs == TSKEY_MAX) {
|
||||||
|
lastRowTs = rowTs;
|
||||||
|
|
||||||
|
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
|
||||||
|
if (iCol >= nLastCol) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
SLastCol *pCol = taosArrayGet(pColArray, iCol);
|
||||||
|
if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (slotIds[iCol] == 0) {
|
||||||
|
STColumn *pTColumn = &pTSchema->columns[0];
|
||||||
|
|
||||||
|
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = rowTs});
|
||||||
|
taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||||
|
|
||||||
|
*pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal};
|
||||||
|
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
|
||||||
|
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
|
||||||
|
if (pCol->colVal.value.pData == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (COL_VAL_IS_NONE(pColVal)) {
|
||||||
|
if (!setNoneCol) {
|
||||||
|
noneCol = iCol;
|
||||||
|
setNoneCol = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
|
||||||
|
if (aColIndex >= 0) {
|
||||||
|
taosArrayRemove(aColArray, aColIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!setNoneCol) {
|
||||||
|
// done, goto return pColArray
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// merge into pColArray
|
||||||
|
setNoneCol = false;
|
||||||
|
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
|
||||||
|
if (iCol >= nLastCol) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// high version's column value
|
||||||
|
SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
|
||||||
|
if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
SColVal *tColVal = &lastColVal->colVal;
|
||||||
|
|
||||||
|
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||||
|
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
|
||||||
|
SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal};
|
||||||
|
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
|
||||||
|
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
|
||||||
|
taosMemoryFree(pLastCol->colVal.value.pData);
|
||||||
|
|
||||||
|
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
|
||||||
|
if (lastCol.colVal.value.pData == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArraySet(pColArray, iCol, &lastCol);
|
||||||
|
int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
|
||||||
|
taosArrayRemove(aColArray, aColIndex);
|
||||||
|
} else if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
|
||||||
|
noneCol = iCol;
|
||||||
|
setNoneCol = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (setNoneCol);
|
||||||
|
|
||||||
|
if (!hasRow) {
|
||||||
|
if (ignoreEarlierTs) {
|
||||||
|
taosArrayDestroy(pColArray);
|
||||||
|
pColArray = NULL;
|
||||||
|
} else {
|
||||||
|
taosArrayClear(pColArray);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*ppLastArray = pColArray;
|
||||||
|
|
||||||
|
nextRowIterClose(&iter);
|
||||||
|
taosArrayDestroy(aColArray);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
nextRowIterClose(&iter);
|
||||||
|
|
||||||
|
*ppLastArray = NULL;
|
||||||
|
taosArrayDestroy(pColArray);
|
||||||
|
taosArrayDestroy(aColArray);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) {
|
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
char key[32] = {0};
|
char key[32] = {0};
|
||||||
|
|
Loading…
Reference in New Issue