tsdbCacheRead: use new H version api of last_row

This commit is contained in:
Minglei Jin 2022-06-29 18:10:25 +08:00
parent ac7f5190aa
commit 5e793c2f3f
2 changed files with 35 additions and 24 deletions

View File

@ -1087,7 +1087,8 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
code = mergeLastRow(uid, pTsdb, &pRow); code = mergeLastRow(uid, pTsdb, &pRow);
// if table's empty or error, return code of -1 // if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) { if (code < 0 || pRow == NULL) {
return -1; *handle = NULL;
return 0;
} }
tsdbCacheInsertLastrow(pCache, uid, pRow); tsdbCacheInsertLastrow(pCache, uid, pRow);
@ -1116,7 +1117,8 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
code = mergeLast(uid, pTsdb, &pRow); code = mergeLast(uid, pTsdb, &pRow);
// if table's empty or error, return code of -1 // if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) { if (code < 0 || pRow == NULL) {
return -1; *handle = NULL;
return 0;
} }
tsdbCacheInsertLast(pCache, uid, pRow); tsdbCacheInsertLast(pCache, uid, pRow);

View File

@ -22,15 +22,15 @@ typedef struct SLastrowReader {
SVnode* pVnode; SVnode* pVnode;
STSchema* pSchema; STSchema* pSchema;
uint64_t uid; uint64_t uid;
// int32_t* pSlotIds; // int32_t* pSlotIds;
char** transferBuf; // todo remove it soon char** transferBuf; // todo remove it soon
int32_t numOfCols; int32_t numOfCols;
int32_t type; int32_t type;
int32_t tableIndex; // currently returned result tables int32_t tableIndex; // currently returned result tables
SArray* pTableList; // table id list SArray* pTableList; // table id list
} SLastrowReader; } SLastrowReader;
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t *slotIds) { static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) {
int32_t numOfRows = pBlock->info.rows; int32_t numOfRows = pBlock->info.rows;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
@ -60,21 +60,21 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReade
pBlock->info.rows += 1; pBlock->info.rows += 1;
} }
int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols,
int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols, void** pReader) { void** pReader) {
SLastrowReader* p = taosMemoryCalloc(1, sizeof(SLastrowReader)); SLastrowReader* p = taosMemoryCalloc(1, sizeof(SLastrowReader));
if (p == NULL) { if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
p->type = type; p->type = type;
p->pVnode = pVnode; p->pVnode = pVnode;
p->numOfCols = numOfCols; p->numOfCols = numOfCols;
p->transferBuf = taosMemoryCalloc(p->numOfCols, POINTER_BYTES); p->transferBuf = taosMemoryCalloc(p->numOfCols, POINTER_BYTES);
STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0); STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0);
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1); p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1);
p->pTableList = pTableIdList; p->pTableList = pTableIdList;
#if 0 #if 0
for(int32_t i = 0; i < p->numOfCols; ++i) { for(int32_t i = 0; i < p->numOfCols; ++i) {
for(int32_t j = 0; j < p->pSchema->numOfCols; ++j) { for(int32_t j = 0; j < p->pSchema->numOfCols; ++j) {
@ -101,7 +101,7 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
int32_t tsdbLastrowReaderClose(void* pReader) { int32_t tsdbLastrowReaderClose(void* pReader) {
SLastrowReader* p = pReader; SLastrowReader* p = pReader;
for(int32_t i = 0; i < p->numOfCols; ++i) { for (int32_t i = 0; i < p->numOfCols; ++i) {
taosMemoryFreeClear(p->transferBuf[i]); taosMemoryFreeClear(p->transferBuf[i]);
} }
@ -117,8 +117,9 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
SLastrowReader* pr = pReader; SLastrowReader* pr = pReader;
STSRow* pRow = NULL; LRUHandle* h = NULL;
size_t numOfTables = taosArrayGetSize(pr->pTableList); STSRow* pRow = NULL;
size_t numOfTables = taosArrayGetSize(pr->pTableList);
// retrieve the only one last row of all tables in the uid list. // retrieve the only one last row of all tables in the uid list.
if (pr->type == LASTROW_RETRIEVE_TYPE_SINGLE) { if (pr->type == LASTROW_RETRIEVE_TYPE_SINGLE) {
@ -127,15 +128,17 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); /* int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); */
int32_t code = tsdbCacheGetLastrowH(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
if (pRow == NULL) { if (h == NULL) {
continue; continue;
} }
pRow = (STSRow*)taosLRUCacheValue(pr->pVnode->pTsdb->lruCache, h);
if (pRow->ts > lastKey) { if (pRow->ts > lastKey) {
// Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
// appended or not. // appended or not.
@ -147,23 +150,29 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
internalResult = true; internalResult = true;
lastKey = pRow->ts; lastKey = pRow->ts;
} }
tsdbCacheRelease(pr->pVnode->pTsdb->lruCache, h);
} }
} else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) { } else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); /* int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); */
int32_t code = tsdbCacheGetLastrowH(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
// no data in the table of Uid // no data in the table of Uid
if (pRow == NULL) { if (h == NULL) {
continue; continue;
} }
pRow = (STSRow*)taosLRUCacheValue(pr->pVnode->pTsdb->lruCache, h);
saveOneRow(pRow, pResBlock, pr, slotIds); saveOneRow(pRow, pResBlock, pr, slotIds);
tsdbCacheRelease(pr->pVnode->pTsdb->lruCache, h);
pr->tableIndex += 1; pr->tableIndex += 1;
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;