From a63473b1cc5688c89e82542cd73a487d58bd01be Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 May 2023 18:54:32 +0800 Subject: [PATCH] refacotor: do some internal refactor. --- include/libs/executor/storageapi.h | 125 ++++++++++----------- source/libs/executor/inc/executorInt.h | 6 +- source/libs/executor/src/executor.c | 22 ++-- source/libs/executor/src/executorInt.c | 2 +- source/libs/executor/src/operator.c | 4 +- source/libs/executor/src/scanoperator.c | 84 +++++++------- source/libs/executor/src/sysscanoperator.c | 6 +- 7 files changed, 120 insertions(+), 129 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 7bcf6f8fd4..67c0eacd26 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -34,6 +34,11 @@ extern "C" { #define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST 0x8 +#define META_READER_NOLOCK 0x1 + +typedef struct SMeta SMeta; +typedef TSKEY (*GetTsFun)(void*); + typedef struct SMetaEntry { int64_t version; int8_t type; @@ -70,6 +75,32 @@ typedef struct SMetaEntry { uint8_t *pBuf; } SMetaEntry; +typedef struct SMetaReader { + int32_t flags; + void * pMeta; + SDecoder coder; + SMetaEntry me; + void * pBuf; + int32_t szBuf; + struct SStorageAPI *storageAPI; +} SMetaReader; + +typedef struct SMTbCursor { + struct TBC *pDbc; + void * pKey; + void * pVal; + int32_t kLen; + int32_t vLen; + SMetaReader mr; +} SMTbCursor; + +typedef struct SRowBuffPos { + void* pRowBuff; + void* pKey; + bool beFlushed; + bool beUsed; +} SRowBuffPos; + // int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables); // int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, // SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr); @@ -135,8 +166,6 @@ typedef struct SMetaTableInfo { char tbName[TSDB_TABLE_NAME_LEN]; } SMetaTableInfo; -typedef struct SMeta SMeta; - typedef struct SSnapContext { SMeta * pMeta; // todo remove it int64_t snapVersion; @@ -162,23 +191,18 @@ typedef struct { // int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); // bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid); // bool tqCurrentBlockConsumed(const STqReader* pReader); -// // int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); // bool tqNextBlockInWal(STqReader* pReader, const char* idstr); // bool tqNextBlockImpl(STqReader *pReader, const char* idstr); - // int32_t getMetafromSnapShot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid); // SMetaTableInfo getUidfromSnapShot(SSnapContext *ctx); // int32_t setForSnapShot(SSnapContext *ctx, int64_t uid); // int32_t destroySnapContext(SSnapContext *ctx); - // SMTbCursor *metaOpenTbCursor(SMeta *pMeta); // void metaCloseTbCursor(SMTbCursor *pTbCur); // int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType); // int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType); -#define META_READER_NOLOCK 0x1 - /*-------------------------------------------------new api format---------------------------------------------------*/ // typedef int32_t (*__store_reader_(STsdbReader *pReader, const void *pTableList, int32_t num); @@ -201,7 +225,7 @@ typedef int32_t (*__store_reader_open_fn_t)(void *pVnode, SQueryTableDataCond *p int32_t numOfTables, SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly, SHashObj **pIgnoreTables); -typedef struct SStoreDataReaderFn { +typedef struct SStoreTSDReader { __store_reader_open_fn_t storeReaderOpen; void (*storeReaderClose)(); void (*setReaderId)(void *pReader, const char *pId); @@ -216,7 +240,7 @@ typedef struct SStoreDataReaderFn { void (*storeReaderGetDataBlockDistInfo)(); void (*storeReaderGetNumOfInMemRows)(); void (*storeReaderNotifyClosing)(); -} SStoreDataReaderFn; +} SStoreTSDReader; /** * int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables); @@ -226,14 +250,14 @@ int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32 SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); */ -typedef struct SStoreCachedDataReaderFn { +typedef struct SStoreCacheReader { int32_t (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr); void *(*closeReader)(void *pReader); int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, SArray *pTableUidList); void (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables); -} SStoreCachedDataReaderFn; +} SStoreCacheReader; /*------------------------------------------------------------------------------------------------------------------*/ /* @@ -259,7 +283,7 @@ SWalReader* tqGetWalReader(STqReader* pReader); int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); */ // todo rename -typedef struct SStoreTqReaderFn { +typedef struct SStoreTqReader { void *(*tqReaderOpen)(); void (*tqReaderClose)(); @@ -282,7 +306,7 @@ typedef struct SStoreTqReaderFn { int32_t (*tqReaderSetSubmitMsg)(); // todo remove it void (*tqReaderNextBlockFilterOut)(); -} SStoreTqReaderFn; +} SStoreTqReader; typedef struct SStoreSnapshotFn { /* @@ -291,10 +315,10 @@ typedef struct SStoreSnapshotFn { int32_t setForSnapShot(SSnapContext *ctx, int64_t uid); int32_t destroySnapContext(SSnapContext *ctx); */ - int32_t (*storeCreateSnapshot)(); - void (*storeDestroySnapshot)(); - SMetaTableInfo (*storeSSGetTableInfo)(); - int32_t (*storeSSGetMetaInfo)(); + int32_t (*createSnapshot)(); + void (*destroySnapshot)(); + SMetaTableInfo (*getTableInfoFromSnapshot)(); + int32_t (*getMetaInfoFromSnapshot)(); } SStoreSnapshotFn; /** @@ -322,17 +346,9 @@ int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen); */ -typedef struct SMetaReader { - int32_t flags; - void * pMeta; - SDecoder coder; - SMetaEntry me; - void * pBuf; - int32_t szBuf; - struct SStorageAPI *storageAPI; -} SMetaReader; -typedef struct SStoreMetaReaderFn { + +typedef struct SStoreMetaReader { void (*initReader)(void *pReader, void *pMeta, int32_t flags); void *(*clearReader)(); @@ -341,9 +357,9 @@ typedef struct SStoreMetaReaderFn { int32_t (*getTableEntryByUid)(); int32_t (*getTableEntryByName)(); int32_t (*readerGetEntryGetUidCache)(SMetaReader *pReader, tb_uid_t uid); -} SStoreMetaReaderFn; +} SStoreMetaReader; -typedef struct SStoreMetaFn { +typedef struct SStoreMeta { /* SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); @@ -398,7 +414,7 @@ int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list); int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg); int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list); */ -} SStoreMetaFn; +} SStoreMeta; typedef struct STdbState { void* rocksdb; @@ -450,21 +466,14 @@ typedef struct SUpdateInfo { } SUpdateInfo; typedef struct { - void* iter; - void* snapshot; - void* readOpt; - void* db; -// rocksdb_iterator_t* iter; -// rocksdb_snapshot_t* snapshot; -// rocksdb_readoptions_t* readOpt; -// rocksdb_t* db; - - void* pCur; + void* iter; // rocksdb_iterator_t* iter; + void* snapshot; // rocksdb_snapshot_t* snapshot; + void* readOpt; // rocksdb_readoptions_t* readOpt; + void* db; // rocksdb_t* db; + void* pCur; int64_t number; } SStreamStateCur; -typedef TSKEY (*GetTsFun)(void*); - typedef struct SStateStore { int32_t (*streamStatePutParName)(SStreamState* pState, int64_t groupId, const char* tbname); int32_t (*streamStateGetParName)(SStreamState* pState, int64_t groupId, void** pVal); @@ -541,33 +550,15 @@ typedef struct SStateStore { } SStateStore; typedef struct SStorageAPI { - SStoreMetaFn metaFn; // todo: refactor - SStoreDataReaderFn storeReader; - SStoreMetaReaderFn metaReaderFn; - SStoreCachedDataReaderFn cacheFn; - SStoreSnapshotFn snapshotFn; - SStoreTqReaderFn tqReaderFn; - SStateStore stateStore; + SStoreMeta metaFn; // todo: refactor + SStoreTSDReader tsdReader; + SStoreMetaReader metaReaderFn; + SStoreCacheReader cacheFn; + SStoreSnapshotFn snapshotFn; + SStoreTqReader tqReaderFn; + SStateStore stateStore; } SStorageAPI; -typedef struct SMTbCursor { - struct TBC *pDbc; - void * pKey; - void * pVal; - int32_t kLen; - int32_t vLen; - SMetaReader mr; -} SMTbCursor; - -typedef struct SRowBuffPos { - void* pRowBuff; - void* pKey; - bool beFlushed; - bool beUsed; -} SRowBuffPos; - - - #ifdef __cplusplus } #endif diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 57b3bc3292..4c5ea4c242 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -208,7 +208,7 @@ typedef struct STableScanBase { SLimitInfo limitInfo; // there are more than one table list exists in one task, if only one vnode exists. STableListInfo* pTableListInfo; - SStoreDataReaderFn readerAPI; + SStoreTSDReader readerAPI; } STableScanBase; typedef struct STableScanInfo { @@ -224,7 +224,7 @@ typedef struct STableScanInfo { int8_t assignBlockUid; bool hasGroupByTag; bool countOnly; - SStoreDataReaderFn readerAPI; + SStoreTSDReader readerAPI; } STableScanInfo; typedef struct STableMergeScanInfo { @@ -372,7 +372,7 @@ typedef struct SStreamScanInfo { int8_t igCheckUpdate; int8_t igExpired; void* pState; //void - SStoreTqReaderFn readerFn; + SStoreTqReader readerFn; SStateStore stateStore; } SStreamScanInfo; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index e89db5b59c..aa51b52c80 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -166,7 +166,7 @@ void doSetTaskId(SOperatorInfo* pOperator, SStorageAPI *pAPI) { if (pStreamScanInfo->pTableScanOp != NULL) { STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info; if (pScanInfo->base.dataReader != NULL) { - pAPI->storeReader.setReaderId(pScanInfo->base.dataReader, pTaskInfo->id.str); + pAPI->tsdReader.setReaderId(pScanInfo->base.dataReader, pTaskInfo->id.str); } } } else { @@ -1087,7 +1087,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT if (pOffset->type == TMQ_OFFSET__LOG) { // todo refactor: move away - pTaskInfo->storageAPI.storeReader.storeReaderClose(pScanBaseInfo->dataReader); + pTaskInfo->storageAPI.tsdReader.storeReaderClose(pScanBaseInfo->dataReader); pScanBaseInfo->dataReader = NULL; ASSERT(0); @@ -1148,7 +1148,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pScanInfo->scanTimes = 0; if (pScanBaseInfo->dataReader == NULL) { - int32_t code = pTaskInfo->storageAPI.storeReader.storeReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, + int32_t code = pTaskInfo->storageAPI.tsdReader.storeReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, pScanInfo->pResBlock, (void**) &pScanBaseInfo->dataReader, id, false, NULL); if (code != TSDB_CODE_SUCCESS) { qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id); @@ -1159,8 +1159,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s", uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); } else { - pTaskInfo->storageAPI.storeReader.storeReaderSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1); - pTaskInfo->storageAPI.storeReader.storeReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond); + pTaskInfo->storageAPI.tsdReader.storeReaderSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1); + pTaskInfo->storageAPI.tsdReader.storeReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond); qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s", uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); } @@ -1182,14 +1182,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT SOperatorInfo* p = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id); STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo; - if (pAPI->snapshotFn.storeCreateSnapshot(sContext, pOffset->uid) != 0) { + if (pAPI->snapshotFn.createSnapshot(sContext, pOffset->uid) != 0) { qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id); terrno = TSDB_CODE_PAR_INTERNAL_ERROR; return -1; } - SMetaTableInfo mtInfo = pTaskInfo->storageAPI.snapshotFn.storeSSGetTableInfo(sContext); - pTaskInfo->storageAPI.storeReader.storeReaderClose(pInfo->dataReader); + SMetaTableInfo mtInfo = pTaskInfo->storageAPI.snapshotFn.getTableInfoFromSnapshot(sContext); + pTaskInfo->storageAPI.tsdReader.storeReaderClose(pInfo->dataReader); pInfo->dataReader = NULL; cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); @@ -1207,7 +1207,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); int32_t size = tableListGetSize(pTableListInfo); - pTaskInfo->storageAPI.storeReader.storeReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, (void**) &pInfo->dataReader, NULL, + pTaskInfo->storageAPI.tsdReader.storeReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, (void**) &pInfo->dataReader, NULL, false, NULL); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); @@ -1219,7 +1219,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { SStreamRawScanInfo* pInfo = pOperator->info; SSnapContext* sContext = pInfo->sContext; - if (pTaskInfo->storageAPI.snapshotFn.storeCreateSnapshot(sContext, pOffset->uid) != 0) { + if (pTaskInfo->storageAPI.snapshotFn.createSnapshot(sContext, pOffset->uid) != 0) { qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version); terrno = TSDB_CODE_PAR_INTERNAL_ERROR; return -1; @@ -1228,7 +1228,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT id); } else if (pOffset->type == TMQ_OFFSET__LOG) { SStreamRawScanInfo* pInfo = pOperator->info; - pTaskInfo->storageAPI.storeReader.storeReaderClose(pInfo->dataReader); + pTaskInfo->storageAPI.tsdReader.storeReaderClose(pInfo->dataReader); pInfo->dataReader = NULL; qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id); } diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index c11c5be1ae..240803e243 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -1195,7 +1195,7 @@ void qStreamCloseTsdbReader(void* task) { qDebug("wait for the reader stopping"); } - pTaskInfo->storageAPI.storeReader.storeReaderClose(pTSInfo->base.dataReader); + pTaskInfo->storageAPI.tsdReader.storeReaderClose(pTSInfo->base.dataReader); pTSInfo->base.dataReader = NULL; // restore the status, todo refactor. diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 1b85aedc1e..8d62b7a65f 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -239,7 +239,7 @@ static ERetType doStopDataReader(SOperatorInfo* pOperator, STraverParam* pParam, STableScanInfo* pInfo = pOperator->info; if (pInfo->base.dataReader != NULL) { - pAPI->storeReader.storeReaderNotifyClosing(pInfo->base.dataReader); + pAPI->tsdReader.storeReaderNotifyClosing(pInfo->base.dataReader); } return OPTR_FN_RET_ABORT; } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { @@ -248,7 +248,7 @@ static ERetType doStopDataReader(SOperatorInfo* pOperator, STraverParam* pParam, if (pInfo->pTableScanOp != NULL) { STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; if (pTableScanInfo != NULL && pTableScanInfo->base.dataReader != NULL) { - pAPI->storeReader.storeReaderNotifyClosing(pTableScanInfo->base.dataReader); + pAPI->tsdReader.storeReaderNotifyClosing(pTableScanInfo->base.dataReader); } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ac8ca675ce..4bed2d3a5d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -267,7 +267,7 @@ static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SStorageAPI* pAPI = &pTaskInfo->storageAPI; bool allColumnsHaveAgg = true; - int32_t code = pAPI->storeReader.storeReaderRetrieveBlockSMA(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg); + int32_t code = pAPI->tsdReader.storeReaderRetrieveBlockSMA(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -350,7 +350,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pCost->filterOutBlocks += 1; pCost->totalRows += pBlock->info.rows; - pAPI->storeReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader); + pAPI->tsdReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader); return TSDB_CODE_SUCCESS; } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64, @@ -358,7 +358,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca pBlockInfo->id.uid); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); pCost->skipBlocks += 1; - pAPI->storeReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader); + pAPI->tsdReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader); return TSDB_CODE_SUCCESS; } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) { pCost->loadBlockStatis += 1; @@ -368,7 +368,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); - pAPI->storeReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader); + pAPI->tsdReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader); return TSDB_CODE_SUCCESS; } else { qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo)); @@ -390,7 +390,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca pCost->filterOutBlocks += 1; (*status) = FUNC_DATA_REQUIRED_FILTEROUT; - pAPI->storeReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader); + pAPI->tsdReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader); return TSDB_CODE_SUCCESS; } } @@ -405,7 +405,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pCost->skipBlocks += 1; - pAPI->storeReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader); + pAPI->tsdReader.storeReaderReleaseDataBlock(pTableScanInfo->dataReader); STableScanInfo* p1 = pOperator->info; if (taosHashGetSize(p1->pIgnoreTables) == taosArrayGetSize(p1->base.pTableListInfo->pTableList)) { @@ -419,7 +419,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca pCost->totalCheckedRows += pBlock->info.rows; pCost->loadBlocks += 1; - SSDataBlock* p = pAPI->storeReader.storeReaderRetrieveDataBlock(pTableScanInfo->dataReader, NULL); + SSDataBlock* p = pAPI->tsdReader.storeReaderRetrieveDataBlock(pTableScanInfo->dataReader, NULL); if (p == NULL) { return terrno; } @@ -693,9 +693,9 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { int64_t st = taosGetTimestampUs(); while (true) { - code = pAPI->storeReader.storeReaderNextDataBlock(pTableScanInfo->base.dataReader, &hasNext); + code = pAPI->tsdReader.storeReaderNextDataBlock(pTableScanInfo->base.dataReader, &hasNext); if (code) { - pAPI->storeReader.storeReaderReleaseDataBlock(pTableScanInfo->base.dataReader); + pAPI->tsdReader.storeReaderReleaseDataBlock(pTableScanInfo->base.dataReader); T_LONG_JMP(pTaskInfo->env, code); } @@ -704,12 +704,12 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { } if (isTaskKilled(pTaskInfo)) { - pAPI->storeReader.storeReaderReleaseDataBlock(pTableScanInfo->base.dataReader); + pAPI->tsdReader.storeReaderReleaseDataBlock(pTableScanInfo->base.dataReader); T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } if (pOperator->status == OP_EXEC_DONE) { - pAPI->storeReader.storeReaderReleaseDataBlock(pTableScanInfo->base.dataReader); + pAPI->tsdReader.storeReaderReleaseDataBlock(pTableScanInfo->base.dataReader); break; } @@ -774,7 +774,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo)); // do prepare for the next round table scan operation - pAPI->storeReader.storeReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); + pAPI->tsdReader.storeReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); } } @@ -782,7 +782,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { if (pTableScanInfo->scanTimes < total) { if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) { prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0); - pAPI->storeReader.storeReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); + pAPI->tsdReader.storeReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo)); } @@ -800,7 +800,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { pTableScanInfo->base.scanFlag = MAIN_SCAN; qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo)); - pAPI->storeReader.storeReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); + pAPI->tsdReader.storeReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); } } } @@ -839,11 +839,11 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); taosRUnLockLatch(&pTaskInfo->lock); - pAPI->storeReader.storeReaderSetTableList(pInfo->base.dataReader, &tInfo, 1); + pAPI->tsdReader.storeReaderSetTableList(pInfo->base.dataReader, &tInfo, 1); qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables, pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo)); - pAPI->storeReader.storeReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); + pAPI->tsdReader.storeReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; } } else { // scan table group by group sequentially @@ -858,7 +858,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); ASSERT(pInfo->base.dataReader == NULL); - int32_t code = pAPI->storeReader.storeReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, + int32_t code = pAPI->tsdReader.storeReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly, &pInfo->pIgnoreTables); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); @@ -887,8 +887,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { STableKeyInfo* pList = NULL; tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); - pAPI->storeReader.storeReaderSetTableList(pInfo->base.dataReader, pList, num); - pAPI->storeReader.storeReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); + pAPI->tsdReader.storeReaderSetTableList(pInfo->base.dataReader, pList, num); + pAPI->tsdReader.storeReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; result = doGroupedTableScan(pOperator); @@ -910,7 +910,7 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr return 0; } -static void destroyTableScanBase(STableScanBase* pBase, SStoreDataReaderFn* pAPI) { +static void destroyTableScanBase(STableScanBase* pBase, SStoreTSDReader* pAPI) { cleanupQueryTableDataCond(&pBase->cond); pAPI->storeReaderClose(pBase->dataReader); @@ -1094,7 +1094,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU SSDataBlock* pBlock = pTableScanInfo->pResBlock; STsdbReader* pReader = NULL; - int32_t code = pAPI->storeReader.storeReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, + int32_t code = pAPI->tsdReader.storeReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, (void**)&pReader, GET_TASKID(pTaskInfo), false, NULL); if (code != TSDB_CODE_SUCCESS) { terrno = code; @@ -1103,7 +1103,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU } bool hasNext = false; - code = pAPI->storeReader.storeReaderNextDataBlock(pReader, &hasNext); + code = pAPI->tsdReader.storeReaderNextDataBlock(pReader, &hasNext); if (code != TSDB_CODE_SUCCESS) { terrno = code; T_LONG_JMP(pTaskInfo->env, code); @@ -1111,12 +1111,12 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU } if (hasNext) { - /*SSDataBlock* p = */ pAPI->storeReader.storeReaderRetrieveDataBlock(pReader, NULL); + /*SSDataBlock* p = */ pAPI->tsdReader.storeReaderRetrieveDataBlock(pReader, NULL); doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); } - pAPI->storeReader.storeReaderClose(pReader); + pAPI->tsdReader.storeReaderClose(pReader); qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64 ", suid:%" PRIu64, pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid); @@ -1657,7 +1657,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { return pResult; } STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - pAPI->storeReader.storeReaderClose(pTSInfo->base.dataReader); + pAPI->tsdReader.storeReaderClose(pTSInfo->base.dataReader); pTSInfo->base.dataReader = NULL; qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1); @@ -1821,7 +1821,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2; } - pAPI->storeReader.storeReaderClose(pTSInfo->base.dataReader); + pAPI->tsdReader.storeReaderClose(pTSInfo->base.dataReader); pTSInfo->base.dataReader = NULL; pInfo->pTableScanOp->status = OP_OPENED; @@ -1901,7 +1901,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - pAPI->storeReader.storeReaderClose(pTSInfo->base.dataReader); + pAPI->tsdReader.storeReaderClose(pTSInfo->base.dataReader); pTSInfo->base.dataReader = NULL; @@ -2162,20 +2162,20 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { bool hasNext = false; if (pInfo->dataReader) { - code = pAPI->storeReader.storeReaderNextDataBlock(pInfo->dataReader, &hasNext); + code = pAPI->tsdReader.storeReaderNextDataBlock(pInfo->dataReader, &hasNext); if (code) { - pAPI->storeReader.storeReaderReleaseDataBlock(pInfo->dataReader); + pAPI->tsdReader.storeReaderReleaseDataBlock(pInfo->dataReader); T_LONG_JMP(pTaskInfo->env, code); } } if (pInfo->dataReader && hasNext) { if (isTaskKilled(pTaskInfo)) { - pAPI->storeReader.storeReaderReleaseDataBlock(pInfo->dataReader); + pAPI->tsdReader.storeReaderReleaseDataBlock(pInfo->dataReader); T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } - SSDataBlock* pBlock = pAPI->storeReader.storeReaderRetrieveDataBlock(pInfo->dataReader, NULL); + SSDataBlock* pBlock = pAPI->tsdReader.storeReaderRetrieveDataBlock(pInfo->dataReader, NULL); if (pBlock == NULL) { T_LONG_JMP(pTaskInfo->env, terrno); } @@ -2185,7 +2185,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { return pBlock; } - SMetaTableInfo mtInfo = pAPI->snapshotFn.storeSSGetTableInfo(pInfo->sContext); + SMetaTableInfo mtInfo = pAPI->snapshotFn.getTableInfoFromSnapshot(pInfo->sContext); STqOffsetVal offset = {0}; if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal qDebug("tmqsnap read snapshot done, change to get data from wal"); @@ -2203,7 +2203,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { int32_t dataLen = 0; int16_t type = 0; int64_t uid = 0; - if (pAPI->snapshotFn.storeSSGetMetaInfo(sContext, &data, &dataLen, &type, &uid) < 0) { + if (pAPI->snapshotFn.getMetaInfoFromSnapshot(sContext, &data, &dataLen, &type, &uid) < 0) { qError("tmqsnap getMetafromSnapShot error"); taosMemoryFreeClear(data); return NULL; @@ -2227,8 +2227,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { static void destroyRawScanOperatorInfo(void* param) { SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param; - pRawScan->pAPI->storeReader.storeReaderClose(pRawScan->dataReader); - pRawScan->pAPI->snapshotFn.storeDestroySnapshot(pRawScan->sContext); + pRawScan->pAPI->tsdReader.storeReaderClose(pRawScan->dataReader); + pRawScan->pAPI->snapshotFn.destroySnapshot(pRawScan->sContext); tableListDestroy(pRawScan->pTableListInfo); taosMemoryFree(pRawScan); } @@ -2662,7 +2662,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SReadHandle* pHandle = &pInfo->base.readHandle; if (NULL == source->dataReader || !source->multiReader) { - code = pAPI->storeReader.storeReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, (void**)&source->dataReader, GET_TASKID(pTaskInfo), false, NULL); + code = pAPI->tsdReader.storeReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, (void**)&source->dataReader, GET_TASKID(pTaskInfo), false, NULL); if (code != 0) { T_LONG_JMP(pTaskInfo->env, code); } @@ -2674,9 +2674,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { qTrace("tsdb/read-table-data: %p, enter next reader", reader); while (true) { - code = pAPI->storeReader.storeReaderNextDataBlock(reader, &hasNext); + code = pAPI->tsdReader.storeReaderNextDataBlock(reader, &hasNext); if (code != 0) { - pAPI->storeReader.storeReaderReleaseDataBlock(reader); + pAPI->tsdReader.storeReaderReleaseDataBlock(reader); pInfo->base.dataReader = NULL; T_LONG_JMP(pTaskInfo->env, code); } @@ -2686,7 +2686,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { } if (isTaskKilled(pTaskInfo)) { - pAPI->storeReader.storeReaderReleaseDataBlock(reader); + pAPI->tsdReader.storeReaderReleaseDataBlock(reader); pInfo->base.dataReader = NULL; T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } @@ -2726,7 +2726,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { qTrace("tsdb/read-table-data: %p, close reader", reader); if (!source->multiReader) { - pAPI->storeReader.storeReaderClose(pInfo->base.dataReader); + pAPI->tsdReader.storeReaderClose(pInfo->base.dataReader); source->dataReader = NULL; } pInfo->base.dataReader = NULL; @@ -2734,7 +2734,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { } if (!source->multiReader) { - pAPI->storeReader.storeReaderClose(pInfo->base.dataReader); + pAPI->tsdReader.storeReaderClose(pInfo->base.dataReader); source->dataReader = NULL; } pInfo->base.dataReader = NULL; @@ -2853,7 +2853,7 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { for (int32_t i = 0; i < numOfTable; ++i) { STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); blockDataDestroy(param->inputBlock); - pAPI->storeReader.storeReaderClose(param->dataReader); + pAPI->tsdReader.storeReaderClose(param->dataReader); param->dataReader = NULL; } taosArrayClear(pInfo->sortSourceParams); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 5bbfb188ed..0e2ea77b86 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2198,7 +2198,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } - pAPI->storeReader.storeReaderGetDataBlockDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); + pAPI->tsdReader.storeReaderGetDataBlockDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); blockDistInfo.numOfInmemRows = (int32_t) pAPI->metaFn.getNumOfRowsInMem(pBlockScanInfo->pHandle); SSDataBlock* pBlock = pBlockScanInfo->pResBlock; @@ -2229,7 +2229,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { static void destroyBlockDistScanOperatorInfo(void* param) { SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param; blockDataDestroy(pDistInfo->pResBlock); - pDistInfo->readHandle.api.storeReader.storeReaderClose(pDistInfo->pHandle); + pDistInfo->readHandle.api.tsdReader.storeReaderClose(pDistInfo->pHandle); tableListDestroy(pDistInfo->pTableListInfo); taosMemoryFreeClear(param); } @@ -2284,7 +2284,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi size_t num = tableListGetSize(pTableListInfo); void* pList = tableListGetInfo(pTableListInfo, 0); - code = readHandle->api.storeReader.storeReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, (void**)&pInfo->pHandle, pTaskInfo->id.str, false, NULL); + code = readHandle->api.tsdReader.storeReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, (void**)&pInfo->pHandle, pTaskInfo->id.str, false, NULL); cleanupQueryTableDataCond(&cond); if (code != 0) { goto _error;