cache/read: use new interface with signle query
This commit is contained in:
parent
0394c6a50c
commit
01d95b5e95
|
@ -273,6 +273,8 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
|||
taosMemoryFree(value);
|
||||
}
|
||||
}
|
||||
|
||||
rocksdb_free(values_list[i]);
|
||||
}
|
||||
taosMemoryFree(values_list);
|
||||
taosMemoryFree(values_list_sizes);
|
||||
|
|
|
@ -199,7 +199,7 @@ void* tsdbCacherowsReaderClose(void* pReader) {
|
|||
taosMemoryFree(pReader);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow,
|
||||
LRUHandle** h) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -222,7 +222,7 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint
|
|||
|
||||
return code;
|
||||
}
|
||||
|
||||
*/
|
||||
static void freeItem(void* pItem) {
|
||||
SLastCol* pCol = (SLastCol*)pItem;
|
||||
if (IS_VAR_DATA_TYPE(pCol->colVal.type)) {
|
||||
|
@ -279,7 +279,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
p->ts = INT64_MIN;
|
||||
}
|
||||
|
||||
pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol));
|
||||
pLastCols = taosArrayInit(pr->numOfCols, sizeof(SLastCol));
|
||||
if (pLastCols == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
|
@ -303,6 +303,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
pr->pDataFReader = NULL;
|
||||
pr->pDataFReaderLast = NULL;
|
||||
|
||||
char const* lstring = pr->type & CACHESCAN_RETRIEVE_LAST ? "last" : "last_row";
|
||||
|
||||
// retrieve the only one last row of all tables in the uid list.
|
||||
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
@ -310,16 +312,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
for (int32_t i = 0; i < pr->numOfTables; ++i) {
|
||||
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
||||
|
||||
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
if (h == NULL) {
|
||||
continue;
|
||||
}
|
||||
if (taosArrayGetSize(pRow) <= 0) {
|
||||
tsdbCacheRelease(lruCache, h);
|
||||
tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, &pRow, pr, lstring);
|
||||
if (TARRAY_SIZE(pRow) <= 0) {
|
||||
taosArrayDestroy(pRow);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -327,47 +322,34 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
bool hasNotNullRow = true;
|
||||
int64_t singleTableLastTs = INT64_MAX;
|
||||
for (int32_t k = 0; k < pr->numOfCols; ++k) {
|
||||
int32_t slotId = slotIds[k];
|
||||
SLastCol* p = taosArrayGet(pLastCols, k);
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k);
|
||||
|
||||
if (slotId == -1) { // the primary timestamp
|
||||
SLastCol* p = taosArrayGet(pLastCols, 0);
|
||||
SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, 0);
|
||||
if (pCol->ts > p->ts) {
|
||||
hasRes = true;
|
||||
p->ts = pCol->ts;
|
||||
p->colVal = pCol->colVal;
|
||||
singleTableLastTs = pCol->ts;
|
||||
if (pColVal->ts > p->ts) {
|
||||
if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
if (!COL_VAL_IS_VALUE(&p->colVal)) {
|
||||
hasNotNullRow = false;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
SLastCol* p = taosArrayGet(pLastCols, slotId);
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
|
||||
|
||||
if (pColVal->ts > p->ts) {
|
||||
if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
if (!COL_VAL_IS_VALUE(&p->colVal)) {
|
||||
hasNotNullRow = false;
|
||||
}
|
||||
continue;
|
||||
hasRes = true;
|
||||
p->ts = pColVal->ts;
|
||||
if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
singleTableLastTs = pColVal->ts;
|
||||
}
|
||||
|
||||
if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||
p->colVal = pColVal->colVal;
|
||||
} else {
|
||||
if (COL_VAL_IS_VALUE(&pColVal->colVal)) {
|
||||
memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
|
||||
}
|
||||
|
||||
hasRes = true;
|
||||
p->ts = pColVal->ts;
|
||||
if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
singleTableLastTs = pColVal->ts;
|
||||
}
|
||||
|
||||
if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||
p->colVal = pColVal->colVal;
|
||||
} else {
|
||||
if (COL_VAL_IS_VALUE(&pColVal->colVal)) {
|
||||
memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
|
||||
}
|
||||
|
||||
p->colVal.value.nData = pColVal->colVal.value.nData;
|
||||
p->colVal.type = pColVal->colVal.type;
|
||||
p->colVal.flag = pColVal->colVal.flag;
|
||||
p->colVal.cid = pColVal->colVal.cid;
|
||||
}
|
||||
p->colVal.value.nData = pColVal->colVal.value.nData;
|
||||
p->colVal.type = pColVal->colVal.type;
|
||||
p->colVal.flag = pColVal->colVal.flag;
|
||||
p->colVal.cid = pColVal->colVal.cid;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -389,7 +371,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
|
||||
}
|
||||
|
||||
tsdbCacheRelease(lruCache, h);
|
||||
taosArrayDestroy(pRow);
|
||||
}
|
||||
|
||||
if (hasRes) {
|
||||
|
@ -398,28 +380,10 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
|
||||
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
|
||||
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
||||
/*
|
||||
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
if (h == NULL) {
|
||||
continue;
|
||||
}
|
||||
if (taosArrayGetSize(pRow) <= 0) {
|
||||
tsdbCacheRelease(lruCache, h);
|
||||
continue;
|
||||
}
|
||||
|
||||
saveOneRow(pRow, pResBlock, pr, slotIds, pRes, pr->idstr);
|
||||
// TODO reset the pRes
|
||||
tsdbCacheRelease(lruCache, h);
|
||||
*/
|
||||
|
||||
char const* lstring = pr->type & CACHESCAN_RETRIEVE_LAST ? "last" : "last_row";
|
||||
tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, &pRow, pr, lstring);
|
||||
if (TARRAY_SIZE(pRow) <= 0) {
|
||||
taosArrayDestroy(pRow);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue