refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2022-12-23 17:45:12 +08:00
parent a4213aedf0
commit 224f5a72c2
5 changed files with 26 additions and 47 deletions

View File

@ -185,7 +185,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void **pReader);
uint64_t suid, void **pReader, const char* idstr);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);

View File

@ -723,13 +723,13 @@ typedef struct SCacheRowsReader {
int32_t numOfCols;
int32_t type;
int32_t tableIndex; // currently returned result tables
STableKeyInfo *pTableList; // table id list
int32_t numOfTables;
SSttBlockLoadInfo *pLoadInfo;
STsdbReadSnap *pReadSnap;
SDataFReader *pDataFReader;
SDataFReader *pDataFReaderLast;
const char *idstr;
} SCacheRowsReader;
typedef struct {
@ -752,8 +752,6 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
size_t tsdbCacheGetCapacity(SVnode *pVnode);
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
// ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1;

View File

@ -1406,30 +1406,6 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *
return code;
}
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pTSchema) {
int32_t code = 0;
int16_t nCol = taosArrayGetSize(pLastArray);
SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLastArray, iCol);
SColVal *tColVal = &tTsVal->colVal;
taosArrayPush(pColArray, tColVal);
}
code = tdSTSRowNew(pColArray, pTSchema, ppRow);
if (code) goto _err;
taosArrayDestroy(pColArray);
return code;
_err:
taosArrayDestroy(pColArray);
return code;
}
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) {
int32_t code = 0;
char key[32] = {0};

View File

@ -20,9 +20,8 @@
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
void** pRes) {
ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
void** pRes, const char* idStr) {
int32_t numOfRows = pBlock->info.rows;
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
@ -65,9 +64,7 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
}
pBlock->info.rows += allNullRow ? 0 : 1;
} else {
ASSERT(HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW));
} else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
@ -94,11 +91,16 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
}
pBlock->info.rows += 1;
} else {
tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr);
return TSDB_CODE_INVALID_PARA;
}
return TSDB_CODE_SUCCESS;
}
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void** pReader) {
uint64_t suid, void** pReader, const char* idstr) {
*pReader = NULL;
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
if (p == NULL) {
@ -142,6 +144,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
return TSDB_CODE_OUT_OF_MEMORY;
}
p->idstr = taosMemoryStrDup(idstr);
*pReader = p;
return TSDB_CODE_SUCCESS;
}
@ -160,6 +164,7 @@ void* tsdbCacherowsReaderClose(void* pReader) {
destroyLastBlockLoadInfo(p->pLoadInfo);
taosMemoryFree((void*) p->idstr);
taosMemoryFree(pReader);
return NULL;
}
@ -308,7 +313,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
if (hasRes) {
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes);
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
}
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
@ -323,7 +328,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
continue;
}
saveOneRow(pRow, pResBlock, pr, slotIds, pRes);
saveOneRow(pRow, pResBlock, pr, slotIds, pRes, pr->idstr);
// TODO reset the pRes
taosArrayPush(pTableUidList, &pKeyInfo->uid);

View File

@ -90,7 +90,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
uint64_t suid = tableListGetSuid(pTableList);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader);
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -216,7 +216,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader);
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str);
taosArrayClear(pInfo->pUidList);
code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);