cache/last: new api: merge last with cid
This commit is contained in:
parent
596bee8028
commit
dca985214c
|
@ -293,6 +293,9 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
||||||
|
int nCols, int16_t *slotIds);
|
||||||
|
|
||||||
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype) {
|
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype) {
|
||||||
static char const *alstring[2] = {"last_row", "last"};
|
static char const *alstring[2] = {"last_row", "last"};
|
||||||
char const *lstring = alstring[ltype];
|
char const *lstring = alstring[ltype];
|
||||||
|
@ -328,6 +331,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
|
||||||
taosMemoryFree(errs);
|
taosMemoryFree(errs);
|
||||||
|
|
||||||
for (int i = 0; i < num_keys; ++i) {
|
for (int i = 0; i < num_keys; ++i) {
|
||||||
|
SArray *pTmpColArray = NULL;
|
||||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
||||||
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
|
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
|
||||||
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
|
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
|
||||||
|
@ -343,18 +347,29 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
|
||||||
} else {
|
} else {
|
||||||
// recalc: load from tsdb
|
// recalc: load from tsdb
|
||||||
if (ltype) {
|
if (ltype) {
|
||||||
SArray *pArray = NULL;
|
int16_t aCols[1] = {cid};
|
||||||
// mergeLastCid(uid, pTsdb, &pArray, pr, cid);
|
int16_t slotIds[1] = {pr->pSlotIds[i]};
|
||||||
|
pTmpColArray = NULL;
|
||||||
|
|
||||||
|
mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds);
|
||||||
|
if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) {
|
||||||
|
pLastCol = taosArrayGet(pTmpColArray, 0);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// mergeLastRowCid(uid, pTsdb, &pArray, pr, cid);
|
// mergeLastRowCid(uid, pTsdb, &pArray, pr, cid);
|
||||||
}
|
}
|
||||||
// still null, then make up a null col value
|
|
||||||
|
// still null, then make up a none col value
|
||||||
|
if (!pLastCol) {
|
||||||
pLastCol = &noneCol;
|
pLastCol = &noneCol;
|
||||||
|
}
|
||||||
|
|
||||||
// maybe store it back to rocks cache
|
// maybe store it back to rocks cache
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pLastArray, pLastCol);
|
taosArrayPush(pLastArray, pLastCol);
|
||||||
|
|
||||||
|
taosArrayDestroy(pTmpColArray);
|
||||||
taosMemoryFree(values_list[i]);
|
taosMemoryFree(values_list[i]);
|
||||||
}
|
}
|
||||||
taosMemoryFree(values_list);
|
taosMemoryFree(values_list);
|
||||||
|
@ -1831,6 +1846,21 @@ static int32_t initLastColArray(STSchema *pTSchema, SArray **ppColArray) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) {
|
||||||
|
SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol));
|
||||||
|
if (NULL == pColArray) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < nCols; ++i) {
|
||||||
|
int16_t slotId = slotIds[i];
|
||||||
|
SLastCol col = {.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
|
||||||
|
taosArrayPush(pColArray, &col);
|
||||||
|
}
|
||||||
|
*ppColArray = pColArray;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
|
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
|
||||||
int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
|
int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
|
||||||
*ppDst = taosMemoryMalloc(len);
|
*ppDst = taosMemoryMalloc(len);
|
||||||
|
@ -2169,6 +2199,172 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
||||||
|
int nCols, int16_t *slotIds) {
|
||||||
|
STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
|
||||||
|
int16_t nLastCol = nCols;
|
||||||
|
int16_t noneCol = 0;
|
||||||
|
bool setNoneCol = false;
|
||||||
|
bool hasRow = false;
|
||||||
|
bool ignoreEarlierTs = false;
|
||||||
|
SArray *pColArray = NULL;
|
||||||
|
SColVal *pColVal = &(SColVal){0};
|
||||||
|
|
||||||
|
int32_t code = initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
|
||||||
|
if (NULL == aColArray) {
|
||||||
|
taosArrayDestroy(pColArray);
|
||||||
|
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < nCols; ++i) {
|
||||||
|
taosArrayPush(aColArray, &aCols[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
TSKEY lastRowTs = TSKEY_MAX;
|
||||||
|
|
||||||
|
CacheNextRowIter iter = {0};
|
||||||
|
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
|
||||||
|
&pr->pDataFReaderLast, pr->lastTs);
|
||||||
|
|
||||||
|
do {
|
||||||
|
TSDBROW *pRow = NULL;
|
||||||
|
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
|
||||||
|
|
||||||
|
if (!pRow) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
hasRow = true;
|
||||||
|
|
||||||
|
int32_t sversion = TSDBROW_SVERSION(pRow);
|
||||||
|
if (sversion != -1) {
|
||||||
|
code = updateTSchema(sversion, pr, uid);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pTSchema = pr->pCurrSchema;
|
||||||
|
}
|
||||||
|
// int16_t nCol = pTSchema->numOfCols;
|
||||||
|
|
||||||
|
TSKEY rowTs = TSDBROW_TS(pRow);
|
||||||
|
|
||||||
|
if (lastRowTs == TSKEY_MAX) {
|
||||||
|
lastRowTs = rowTs;
|
||||||
|
|
||||||
|
for (int16_t iCol = 0; iCol < nCols; ++iCol) {
|
||||||
|
if (iCol >= nLastCol) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
SLastCol *pCol = taosArrayGet(pColArray, iCol);
|
||||||
|
if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||||
|
|
||||||
|
*pCol = (SLastCol){.ts = lastRowTs, .colVal = *pColVal};
|
||||||
|
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
|
||||||
|
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
|
||||||
|
if (pCol->colVal.value.pData == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!COL_VAL_IS_VALUE(pColVal)) {
|
||||||
|
if (!setNoneCol) {
|
||||||
|
noneCol = iCol;
|
||||||
|
setNoneCol = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
|
||||||
|
if (aColIndex >= 0) {
|
||||||
|
taosArrayRemove(aColArray, aColIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!setNoneCol) {
|
||||||
|
// done, goto return pColArray
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// merge into pColArray
|
||||||
|
setNoneCol = false;
|
||||||
|
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
|
||||||
|
if (iCol >= nLastCol) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// high version's column value
|
||||||
|
SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
|
||||||
|
if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
SColVal *tColVal = &lastColVal->colVal;
|
||||||
|
|
||||||
|
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||||
|
if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) {
|
||||||
|
SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal};
|
||||||
|
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
|
||||||
|
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
|
||||||
|
taosMemoryFree(pLastCol->colVal.value.pData);
|
||||||
|
|
||||||
|
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
|
||||||
|
if (lastCol.colVal.value.pData == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArraySet(pColArray, iCol, &lastCol);
|
||||||
|
int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
|
||||||
|
taosArrayRemove(aColArray, aColIndex);
|
||||||
|
} else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
|
||||||
|
noneCol = iCol;
|
||||||
|
setNoneCol = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (setNoneCol);
|
||||||
|
|
||||||
|
// if (taosArrayGetSize(pColArray) <= 0) {
|
||||||
|
//*ppLastArray = NULL;
|
||||||
|
// taosArrayDestroy(pColArray);
|
||||||
|
//} else {
|
||||||
|
if (!hasRow) {
|
||||||
|
if (ignoreEarlierTs) {
|
||||||
|
taosArrayDestroy(pColArray);
|
||||||
|
pColArray = NULL;
|
||||||
|
} else {
|
||||||
|
taosArrayClear(pColArray);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*ppLastArray = pColArray;
|
||||||
|
//}
|
||||||
|
|
||||||
|
nextRowIterClose(&iter);
|
||||||
|
taosArrayDestroy(aColArray);
|
||||||
|
// taosMemoryFreeClear(pTSchema);
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
nextRowIterClose(&iter);
|
||||||
|
// taosMemoryFreeClear(pTSchema);
|
||||||
|
*ppLastArray = NULL;
|
||||||
|
taosArrayDestroy(pColArray);
|
||||||
|
taosArrayDestroy(aColArray);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) {
|
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
char key[32] = {0};
|
char key[32] = {0};
|
||||||
|
|
Loading…
Reference in New Issue