Merge pull request #21511 from taosdata/refact/fillhistory
refactor: do some internal refactor.
This commit is contained in:
commit
2ac83ec9c7
|
@ -103,63 +103,6 @@ typedef struct SRowBuffPos {
|
|||
bool beUsed;
|
||||
} SRowBuffPos;
|
||||
|
||||
// int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables);
|
||||
// int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||
// SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr);
|
||||
// int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
|
||||
// SArray *pTableUids);
|
||||
// void *tsdbCacherowsReaderClose(void *pReader);
|
||||
// int32_t tsdbGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
|
||||
|
||||
// int32_t tsdbReaderOpen(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
||||
// SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly, SHashObj** pIgnoreTables);
|
||||
// int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
|
||||
// void tsdbReaderSetId(STsdbReader *pReader, const char *idstr);
|
||||
// void tsdbReaderClose(STsdbReader *pReader);
|
||||
// int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
|
||||
// int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
|
||||
// void tsdbReleaseDataBlock(STsdbReader *pReader);
|
||||
// SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
||||
// int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
||||
// int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||
// int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
|
||||
// void *tsdbGetIdx(void *pMeta);
|
||||
// void *tsdbGetIvtIdx(void *pMeta);
|
||||
// uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader);
|
||||
// void tsdbReaderSetCloseFlag(STsdbReader *pReader);
|
||||
// int64_t tsdbGetLastTimestamp(void* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr);
|
||||
|
||||
// int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list);
|
||||
// int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
|
||||
// int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
|
||||
// void *vnodeGetIdx(void *pVnode);
|
||||
// void *vnodeGetIvtIdx(void *pVnode);
|
||||
|
||||
// void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
|
||||
// void metaReaderReleaseLock(SMetaReader *pReader);
|
||||
// void metaReaderClear(SMetaReader *pReader);
|
||||
// int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||
// int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
|
||||
// int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
|
||||
// int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList);
|
||||
// int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList);
|
||||
// const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
|
||||
// int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
|
||||
//
|
||||
// int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName);
|
||||
// int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid);
|
||||
// int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
|
||||
// bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
|
||||
// int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
|
||||
// bool *acquired);
|
||||
// int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
|
||||
// int32_t payloadLen, double selectivityRatio);
|
||||
// int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid);
|
||||
// tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
|
||||
// int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
|
||||
// int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
|
||||
// int32_t payloadLen);
|
||||
|
||||
// tq
|
||||
typedef struct SMetaTableInfo {
|
||||
int64_t suid;
|
||||
|
@ -201,64 +144,39 @@ typedef struct {
|
|||
// int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
|
||||
// int32_t destroySnapContext(SSnapContext *ctx);
|
||||
|
||||
// clang-format off
|
||||
/*-------------------------------------------------new api format---------------------------------------------------*/
|
||||
|
||||
// typedef int32_t (*__store_reader_(STsdbReader *pReader, const void *pTableList, int32_t num);
|
||||
// typedef void (*tsdbReaderSetId(STsdbReader *pReader, const char *idstr);
|
||||
// typedef void (*tsdbReaderClose(STsdbReader *pReader);
|
||||
// typedef int32_t (*tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
|
||||
// typedef int32_t (*tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
|
||||
// typedef void (*tsdbReleaseDataBlock(STsdbReader *pReader);
|
||||
// typedef SSDataBlock * (*tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
||||
// typedef int32_t (*tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
||||
// typedef int32_t (*tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||
// typedef int64_t (*tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
|
||||
// typedef void * (*tsdbGetIdx(void *pMeta);
|
||||
// typedef void * (*tsdbGetIvtIdx(void *pMeta);
|
||||
// typedef uint64_t (*tsdbGetReaderMaxVersion(STsdbReader *pReader);
|
||||
// typedef void (*tsdbReaderSetCloseFlag(STsdbReader *pReader);
|
||||
// typedef int64_t (*tsdbGetLastTimestamp(void* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr);
|
||||
|
||||
typedef int32_t (*__store_reader_open_fn_t)(void *pVnode, SQueryTableDataCond *pCond, void *pTableList,
|
||||
int32_t numOfTables, SSDataBlock *pResBlock, void **ppReader,
|
||||
const char *idstr, bool countOnly, SHashObj **pIgnoreTables);
|
||||
|
||||
typedef struct TsdReader {
|
||||
__store_reader_open_fn_t tsdReaderOpen;
|
||||
void (*tsdReaderClose)();
|
||||
void (*tsdSetReaderTaskId)(void *pReader, const char *pId);
|
||||
int32_t (*tsdSetQueryTableList)();
|
||||
int32_t (*tsdNextDataBlock)();
|
||||
int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
|
||||
SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly,
|
||||
SHashObj** pIgnoreTables);
|
||||
void (*tsdReaderClose)();
|
||||
void (*tsdSetReaderTaskId)(void *pReader, const char *pId);
|
||||
int32_t (*tsdSetQueryTableList)();
|
||||
int32_t (*tsdNextDataBlock)();
|
||||
|
||||
int32_t (*tsdReaderRetrieveBlockSMAInfo)();
|
||||
int32_t (*tsdReaderRetrieveBlockSMAInfo)();
|
||||
SSDataBlock *(*tsdReaderRetrieveDataBlock)();
|
||||
|
||||
void (*tsdReaderReleaseDataBlock)();
|
||||
void (*tsdReaderReleaseDataBlock)();
|
||||
|
||||
int32_t (*tsdReaderResetStatus)();
|
||||
int32_t (*tsdReaderGetDataBlockDistInfo)();
|
||||
int64_t (*tsdReaderGetNumOfInMemRows)();
|
||||
void (*tsdReaderNotifyClosing)();
|
||||
int32_t (*tsdReaderResetStatus)();
|
||||
int32_t (*tsdReaderGetDataBlockDistInfo)();
|
||||
int64_t (*tsdReaderGetNumOfInMemRows)();
|
||||
void (*tsdReaderNotifyClosing)();
|
||||
} TsdReader;
|
||||
|
||||
|
||||
/**
|
||||
* int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables);
|
||||
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr);
|
||||
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
|
||||
SArray *pTableUids);
|
||||
void *tsdbCacherowsReaderClose(void *pReader);
|
||||
*/
|
||||
typedef struct SStoreCacheReader {
|
||||
int32_t (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr);
|
||||
void *(*closeReader)(void *pReader);
|
||||
int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
|
||||
SArray *pTableUidList);
|
||||
int32_t (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables);
|
||||
int32_t (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr);
|
||||
void *(*closeReader)(void *pReader);
|
||||
int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
|
||||
SArray *pTableUidList);
|
||||
int32_t (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables);
|
||||
} SStoreCacheReader;
|
||||
|
||||
// clang-format on
|
||||
|
||||
/*------------------------------------------------------------------------------------------------------------------*/
|
||||
/*
|
||||
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
|
||||
|
@ -309,16 +227,10 @@ typedef struct SStoreTqReader {
|
|||
} SStoreTqReader;
|
||||
|
||||
typedef struct SStoreSnapshotFn {
|
||||
/*
|
||||
int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
|
||||
SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx);
|
||||
int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
|
||||
int32_t destroySnapContext(SSnapContext *ctx);
|
||||
*/
|
||||
int32_t (*createSnapshot)();
|
||||
int32_t (*destroySnapshot)();
|
||||
SMetaTableInfo (*getMetaTableInfoFromSnapshot)();
|
||||
int32_t (*getTableInfoFromSnapshot)();
|
||||
int32_t (*createSnapshot)(SSnapContext *ctx, int64_t uid);
|
||||
int32_t (*destroySnapshot)(SSnapContext *ctx);
|
||||
SMetaTableInfo (*getMetaTableInfoFromSnapshot)(SSnapContext* ctx);
|
||||
int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid);
|
||||
} SStoreSnapshotFn;
|
||||
|
||||
/**
|
||||
|
@ -327,12 +239,10 @@ void metaReaderReleaseLock(SMetaReader *pReader);
|
|||
void metaReaderClear(SMetaReader *pReader);
|
||||
int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||
int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
|
||||
int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
|
||||
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList);
|
||||
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
|
||||
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
|
||||
|
||||
int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName);
|
||||
int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid);
|
||||
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
|
||||
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
|
||||
|
@ -340,51 +250,40 @@ int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *k
|
|||
bool *acquired);
|
||||
int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
|
||||
int32_t payloadLen, double selectivityRatio);
|
||||
int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid);
|
||||
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
|
||||
int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
|
||||
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
|
||||
int32_t payloadLen);
|
||||
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen);
|
||||
*/
|
||||
|
||||
typedef struct SStoreMeta {
|
||||
SMTbCursor *(*openTableMetaCursor)(void *pVnode); // metaOpenTbCursor
|
||||
void (*closeTableMetaCursor)(SMTbCursor *pTbCur); // metaCloseTbCursor
|
||||
int32_t (*cursorNext)(); // metaTbCursorNext
|
||||
int32_t (*cursorPrev)(); // metaTbCursorPrev
|
||||
SMTbCursor *(*openTableMetaCursor)(void *pVnode); // metaOpenTbCursor
|
||||
void (*closeTableMetaCursor)(SMTbCursor *pTbCur); // metaCloseTbCursor
|
||||
int32_t (*cursorNext)(SMTbCursor *pTbCur, ETableType jumpTableType); // metaTbCursorNext
|
||||
int32_t (*cursorPrev)(SMTbCursor *pTbCur, ETableType jumpTableType); // metaTbCursorPrev
|
||||
|
||||
int32_t (*getTableTags)(void *pVnode, uint64_t suid, SArray *uidList);
|
||||
int32_t (*getTableTagsByUid)(void *pVnode, int64_t suid, SArray *uidList);
|
||||
int32_t (*getTableTags)(void *pVnode, uint64_t suid, SArray *uidList);
|
||||
int32_t (*getTableTagsByUid)(void *pVnode, int64_t suid, SArray *uidList);
|
||||
const void *(*extractTagVal)(const void *tag, int16_t type, STagVal *tagVal); // todo remove it
|
||||
|
||||
int32_t (*getTableUidByName)(void *pVnode, char *tbName, uint64_t *uid);
|
||||
int32_t (*getTableTypeByName)(void *pVnode, char *tbName, ETableType *tbType);
|
||||
int32_t (*getTableNameByUid)(void *pVnode, uint64_t uid, char *tbName);
|
||||
bool (*isTableExisted)(void *pVnode, tb_uid_t uid);
|
||||
int32_t (*getTableUidByName)(void *pVnode, char *tbName, uint64_t *uid);
|
||||
int32_t (*getTableTypeByName)(void *pVnode, char *tbName, ETableType *tbType);
|
||||
int32_t (*getTableNameByUid)(void *pVnode, uint64_t uid, char *tbName);
|
||||
bool (*isTableExisted)(void *pVnode, tb_uid_t uid);
|
||||
|
||||
/**
|
||||
* int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
|
||||
int32_t payloadLen, double selectivityRatio);
|
||||
int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid);
|
||||
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
|
||||
int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
|
||||
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
|
||||
int32_t payloadLen);
|
||||
*/
|
||||
void (*getCachedTableList)();
|
||||
void (*putTableListIntoCache)();
|
||||
int32_t (*metaGetCachedTbGroup)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
|
||||
int32_t (*metaPutTbGroupToCache)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
void *(*storeGetIndexInfo)();
|
||||
void *(*getInvertIndex)(void* pVnode);
|
||||
int32_t (*getChildTableList)(void *pVnode, int64_t suid, SArray *list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
|
||||
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList
|
||||
void *storeGetVersionRange;
|
||||
void *storeGetLastTimestamp;
|
||||
int32_t (*getCachedTableList)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes);
|
||||
int32_t (*putCachedTableList)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen, double selectivityRatio);
|
||||
|
||||
int32_t (*getTableSchema)(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); // tsdbGetTableSchema
|
||||
void *(*storeGetIndexInfo)();
|
||||
void *(*getInvertIndex)(void* pVnode);
|
||||
int32_t (*getChildTableList)(void *pVnode, int64_t suid, SArray *list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
|
||||
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList
|
||||
void *storeGetVersionRange;
|
||||
void *storeGetLastTimestamp;
|
||||
|
||||
int32_t (*getTableSchema)(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); // tsdbGetTableSchema
|
||||
|
||||
// db name, vgId, numOfTables, numOfSTables
|
||||
int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
|
||||
|
|
|
@ -100,18 +100,11 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs
|
|||
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);
|
||||
|
||||
// meta
|
||||
typedef struct SMeta SMeta; // todo: remove
|
||||
typedef struct SMetaReader SMetaReader;
|
||||
typedef struct SMetaEntry SMetaEntry;
|
||||
|
||||
#define META_READER_NOLOCK 0x1
|
||||
|
||||
void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta* pAPI);
|
||||
void metaReaderReleaseLock(SMetaReader *pReader);
|
||||
void metaReaderClear(SMetaReader *pReader);
|
||||
int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||
int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
|
||||
int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
|
||||
int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *uidList);
|
||||
int32_t metaGetTableTagsByUids(void* pVnode, int64_t suid, SArray *uidList);
|
||||
int32_t metaReadNext(SMetaReader *pReader);
|
||||
|
@ -122,37 +115,18 @@ int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName);
|
|||
int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid);
|
||||
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
|
||||
bool metaIsTableExist(void* pVnode, tb_uid_t uid);
|
||||
int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
|
||||
int32_t metaGetCachedTableUidList(void *pVnode, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
|
||||
bool *acquired);
|
||||
int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
|
||||
int32_t metaUidFilterCachePut(void *pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
|
||||
int32_t payloadLen, double selectivityRatio);
|
||||
int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid);
|
||||
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
|
||||
int32_t metaTbGroupCacheClear(SMeta *pMeta, uint64_t suid);
|
||||
int32_t metaGetCachedTbGroup(SMeta *pMeta, tb_uid_t suid, const uint8_t *pKey, int32_t keyLen, SArray **pList);
|
||||
int32_t metaPutTbGroupToCache(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
|
||||
int32_t metaGetCachedTbGroup(void *pVnode, tb_uid_t suid, const uint8_t *pKey, int32_t keyLen, SArray **pList);
|
||||
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
|
||||
int32_t payloadLen);
|
||||
|
||||
int64_t metaGetTbNum(SMeta *pMeta);
|
||||
int64_t metaGetNtbNum(SMeta *pMeta);
|
||||
//typedef struct {
|
||||
// int64_t uid;
|
||||
// int64_t ctbNum;
|
||||
//} SMetaStbStats;
|
||||
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables);
|
||||
|
||||
#if 1 // refact APIs below (TODO)
|
||||
typedef SVCreateTbReq STbCfg;
|
||||
typedef SVCreateTSmaReq SSmaCfg;
|
||||
|
||||
typedef struct SMTbCursor SMTbCursor;
|
||||
|
||||
SMTbCursor *metaOpenTbCursor(void *pVnode);
|
||||
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
||||
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
|
||||
int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
|
||||
|
||||
#endif
|
||||
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables);
|
||||
|
||||
// tsdb
|
||||
typedef struct STsdbReader STsdbReader;
|
||||
|
@ -168,8 +142,8 @@ typedef struct STsdbReader STsdbReader;
|
|||
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
|
||||
#define CACHESCAN_RETRIEVE_LAST 0x8
|
||||
|
||||
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
||||
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly,
|
||||
int32_t tsdbReaderOpen(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
||||
SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly,
|
||||
SHashObj **pIgnoreTables);
|
||||
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
|
||||
void tsdbReaderSetId(STsdbReader *pReader, const char *idstr);
|
||||
|
@ -201,32 +175,11 @@ size_t tsdbCacheGetUsage(SVnode *pVnode);
|
|||
int32_t tsdbCacheGetElems(SVnode *pVnode);
|
||||
|
||||
//// tq
|
||||
//typedef struct SMetaTableInfo {
|
||||
// int64_t suid;
|
||||
// int64_t uid;
|
||||
// SSchemaWrapper *schema;
|
||||
// char tbName[TSDB_TABLE_NAME_LEN];
|
||||
//} SMetaTableInfo;
|
||||
|
||||
typedef struct SIdInfo {
|
||||
int64_t version;
|
||||
int32_t index;
|
||||
} SIdInfo;
|
||||
|
||||
//typedef struct SSnapContext {
|
||||
// SMeta *pMeta;
|
||||
// int64_t snapVersion;
|
||||
// TBC *pCur;
|
||||
// int64_t suid;
|
||||
// int8_t subType;
|
||||
// SHashObj *idVersion;
|
||||
// SHashObj *suidInfo;
|
||||
// SArray *idList;
|
||||
// int32_t index;
|
||||
// bool withMeta;
|
||||
// bool queryMeta; // true-get meta, false-get data
|
||||
//} SSnapContext;
|
||||
|
||||
typedef struct STqReader {
|
||||
SPackedData msg;
|
||||
SSubmitReq2 submit;
|
||||
|
@ -345,59 +298,6 @@ struct SVnodeCfg {
|
|||
#define TABLE_ROLLUP_ON ((int8_t)0x1)
|
||||
#define TABLE_IS_ROLLUP(FLG) (((FLG) & (TABLE_ROLLUP_ON)) != 0)
|
||||
#define TABLE_SET_ROLLUP(FLG) ((FLG) |= TABLE_ROLLUP_ON)
|
||||
//struct SMetaEntry {
|
||||
// int64_t version;
|
||||
// int8_t type;
|
||||
// int8_t flags; // TODO: need refactor?
|
||||
// tb_uid_t uid;
|
||||
// char *name;
|
||||
// union {
|
||||
// struct {
|
||||
// SSchemaWrapper schemaRow;
|
||||
// SSchemaWrapper schemaTag;
|
||||
// SRSmaParam rsmaParam;
|
||||
// } stbEntry;
|
||||
// struct {
|
||||
// int64_t ctime;
|
||||
// int32_t ttlDays;
|
||||
// int32_t commentLen;
|
||||
// char *comment;
|
||||
// tb_uid_t suid;
|
||||
// uint8_t *pTags;
|
||||
// } ctbEntry;
|
||||
// struct {
|
||||
// int64_t ctime;
|
||||
// int32_t ttlDays;
|
||||
// int32_t commentLen;
|
||||
// char *comment;
|
||||
// int32_t ncid; // next column id
|
||||
// SSchemaWrapper schemaRow;
|
||||
// } ntbEntry;
|
||||
// struct {
|
||||
// STSma *tsma;
|
||||
// } smaEntry;
|
||||
// };
|
||||
//
|
||||
// uint8_t *pBuf;
|
||||
//};
|
||||
|
||||
//struct SMetaReader {
|
||||
// int32_t flags;
|
||||
// SMeta *pMeta;
|
||||
// SDecoder coder;
|
||||
// SMetaEntry me;
|
||||
// void *pBuf;
|
||||
// int32_t szBuf;
|
||||
//};
|
||||
|
||||
//struct SMTbCursor {
|
||||
// TBC *pDbc;
|
||||
// void *pKey;
|
||||
// void *pVal;
|
||||
// int32_t kLen;
|
||||
// int32_t vLen;
|
||||
// SMetaReader mr;
|
||||
//};
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -103,6 +103,17 @@ struct SQueryNode {
|
|||
_query_reseek_func_t reseek;
|
||||
};
|
||||
|
||||
#if 1 // refact APIs below (TODO)
|
||||
typedef SVCreateTbReq STbCfg;
|
||||
typedef SVCreateTSmaReq SSmaCfg;
|
||||
|
||||
SMTbCursor *metaOpenTbCursor(void *pVnode);
|
||||
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
||||
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
|
||||
int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
|
||||
|
||||
#endif
|
||||
|
||||
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
|
||||
void* vnodeBufPoolMallocAligned(SVBufPool* pPool, int size);
|
||||
void vnodeBufPoolFree(SVBufPool* pPool, void* p);
|
||||
|
@ -143,6 +154,9 @@ int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, in
|
|||
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
|
||||
int metaAlterCache(SMeta* pMeta, int32_t nPage);
|
||||
|
||||
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid);
|
||||
int32_t metaTbGroupCacheClear(SMeta *pMeta, uint64_t suid);
|
||||
|
||||
int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
|
||||
|
||||
|
@ -477,6 +491,7 @@ struct SCompactInfo {
|
|||
|
||||
void initStorageAPI(SStorageAPI* pAPI);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -499,8 +499,9 @@ static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid,
|
|||
ASSERT(keyLen == sizeof(uint64_t) * 2);
|
||||
}
|
||||
|
||||
int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
|
||||
int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
|
||||
bool* acquireRes) {
|
||||
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
|
||||
int32_t vgId = TD_VID(pMeta->pVnode);
|
||||
|
||||
// generate the composed key for LRU cache
|
||||
|
@ -603,9 +604,10 @@ static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyL
|
|||
}
|
||||
|
||||
// check both the payload size and selectivity ratio
|
||||
int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
|
||||
int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
|
||||
int32_t payloadLen, double selectivityRatio) {
|
||||
int32_t code = 0;
|
||||
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
|
||||
int32_t vgId = TD_VID(pMeta->pVnode);
|
||||
|
||||
if (selectivityRatio > tsSelectivityRatio) {
|
||||
|
@ -702,7 +704,8 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) {
|
||||
int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) {
|
||||
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
|
||||
int32_t vgId = TD_VID(pMeta->pVnode);
|
||||
|
||||
// generate the composed key for LRU cache
|
||||
|
@ -786,9 +789,10 @@ static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value)
|
|||
}
|
||||
|
||||
|
||||
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
|
||||
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
|
||||
int32_t payloadLen) {
|
||||
int32_t code = 0;
|
||||
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
|
||||
int32_t vgId = TD_VID(pMeta->pVnode);
|
||||
|
||||
if (payloadLen > tsTagFilterResCacheSize) {
|
||||
|
|
|
@ -700,8 +700,6 @@ int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
|
|||
return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries;
|
||||
}
|
||||
|
||||
int64_t metaGetNtbNum(SMeta *pMeta) { return pMeta->pVnode->config.vndStats.numOfNTables; }
|
||||
|
||||
typedef struct {
|
||||
SMeta *pMeta;
|
||||
TBC *pCur;
|
||||
|
|
|
@ -13,8 +13,10 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "vnodeInt.h"
|
||||
#include "meta.h"
|
||||
|
||||
|
||||
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME);
|
||||
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME);
|
||||
|
||||
|
|
|
@ -754,7 +754,7 @@ static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacit
|
|||
return terrno;
|
||||
}
|
||||
|
||||
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
|
||||
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void** ppReader, int32_t capacity,
|
||||
SSDataBlock* pResBlock, const char* idstr) {
|
||||
int32_t code = 0;
|
||||
int8_t level = 0;
|
||||
|
@ -4396,11 +4396,12 @@ static void freeSchemaFunc(void* param) {
|
|||
}
|
||||
|
||||
// ====================================== EXPOSED APIs ======================================
|
||||
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
|
||||
SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr, bool countOnly, SHashObj** pIgnoreTables) {
|
||||
int32_t tsdbReaderOpen(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
|
||||
SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly, SHashObj** pIgnoreTables) {
|
||||
STimeWindow window = pCond->twindows;
|
||||
SVnodeCfg* pConf = &(((SVnode*)pVnode)->config);
|
||||
|
||||
int32_t capacity = pVnode->config.tsdbCfg.maxRows;
|
||||
int32_t capacity = pConf->tsdbCfg.maxRows;
|
||||
if (pResBlock != NULL) {
|
||||
blockDataEnsureCapacity(pResBlock, capacity);
|
||||
}
|
||||
|
@ -4431,7 +4432,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
|
|||
}
|
||||
|
||||
// here we only need one more row, so the capacity is set to be ONE.
|
||||
code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, pResBlock, idstr);
|
||||
code = tsdbReaderCreate(pVnode, pCond, (void**)&((STsdbReader*)pReader)->innerReader[0], 1, pResBlock, idstr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -4445,7 +4446,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
|
|||
}
|
||||
pCond->order = order;
|
||||
|
||||
code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, pResBlock, idstr);
|
||||
code = tsdbReaderCreate(pVnode, pCond, (void**)&((STsdbReader*)pReader)->innerReader[1], 1, pResBlock, idstr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -4495,7 +4496,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
|
|||
goto _err;
|
||||
}
|
||||
|
||||
pReader->status.pLDataIter = taosMemoryCalloc(pVnode->config.sttTrigger, sizeof(SLDataIter));
|
||||
pReader->status.pLDataIter = taosMemoryCalloc(pConf->sttTrigger, sizeof(SLDataIter));
|
||||
if (pReader->status.pLDataIter == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
|
@ -5546,7 +5547,7 @@ int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTabl
|
|||
int64_t key = INT64_MIN;
|
||||
|
||||
for(int32_t i = 0; i < numOfTables; ++i) {
|
||||
int32_t code = tsdbReaderOpen(pVnode, &cond, &pTableKeyInfo[i], 1, pBlock, &pReader, pIdStr, false, NULL);
|
||||
int32_t code = tsdbReaderOpen(pVnode, &cond, &pTableKeyInfo[i], 1, pBlock, (void**)&pReader, pIdStr, false, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -31,17 +31,18 @@ static void initSnapshotFn(SStoreSnapshotFn* pSnapshot);
|
|||
void initStorageAPI(SStorageAPI* pAPI) {
|
||||
initTsdbReaderAPI(&pAPI->tsdReader);
|
||||
initMetadataAPI(&pAPI->metaFn);
|
||||
initTqAPI(&pAPI->tqReaderFn);
|
||||
initStateStoreAPI(&pAPI->stateStore);
|
||||
initMetaReaderAPI(&pAPI->metaReaderFn);
|
||||
initMetaFilterAPI(&pAPI->metaFilter);
|
||||
initTqAPI(&pAPI->tqReaderFn);
|
||||
initFunctionStateStore(&pAPI->functionStore);
|
||||
initCacheFn(&pAPI->cacheFn);
|
||||
initSnapshotFn(&pAPI->snapshotFn);
|
||||
}
|
||||
|
||||
void initTsdbReaderAPI(TsdReader* pReader) {
|
||||
pReader->tsdReaderOpen = (__store_reader_open_fn_t)tsdbReaderOpen;
|
||||
pReader->tsdReaderOpen = (int32_t(*)(void*, SQueryTableDataCond*, void*, int32_t, SSDataBlock*, void**, const char*,
|
||||
bool, SHashObj**))tsdbReaderOpen;
|
||||
pReader->tsdReaderClose = tsdbReaderClose;
|
||||
|
||||
pReader->tsdNextDataBlock = tsdbNextDataBlock;
|
||||
|
@ -87,6 +88,12 @@ void initMetadataAPI(SStoreMeta* pMeta) {
|
|||
|
||||
pMeta->getTableSchema = tsdbGetTableSchema; // todo refactor
|
||||
pMeta->storeGetTableList = vnodeGetTableList;
|
||||
|
||||
pMeta->getCachedTableList = metaGetCachedTableUidList;
|
||||
pMeta->putCachedTableList = metaUidFilterCachePut;
|
||||
|
||||
pMeta->metaGetCachedTbGroup = metaGetCachedTbGroup;
|
||||
pMeta->metaPutTbGroupToCache = metaPutTbGroupToCache;
|
||||
}
|
||||
|
||||
void initTqAPI(SStoreTqReader* pTq) {
|
||||
|
|
|
@ -495,7 +495,7 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
|
|||
genTbGroupDigest((SNode*)listNode, digest, &context);
|
||||
nodesFree(listNode);
|
||||
|
||||
pAPI->metaFn.getCachedTableList(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), &tableList);
|
||||
pAPI->metaFn.metaGetCachedTbGroup(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), &tableList);
|
||||
if (tableList) {
|
||||
taosArrayDestroy(pTableListInfo->pTableList);
|
||||
pTableListInfo->pTableList = tableList;
|
||||
|
@ -632,7 +632,7 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
|
|||
|
||||
if (tsTagFilterCache) {
|
||||
tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
|
||||
pAPI->metaFn.putTableListIntoCache(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), tableList, taosArrayGetSize(tableList) * sizeof(STableKeyInfo));
|
||||
pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), tableList, taosArrayGetSize(tableList) * sizeof(STableKeyInfo));
|
||||
}
|
||||
|
||||
// int64_t st2 = taosGetTimestampUs();
|
||||
|
|
|
@ -3446,7 +3446,7 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, S
|
|||
uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
|
||||
pRes->info.id.groupId = groupId;
|
||||
|
||||
int64_t numOfTables = 0;//metaGetNtbNum(pInfo->readHandle.vnode);
|
||||
int64_t numOfTables = 0;
|
||||
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, NULL, &numOfTables);
|
||||
|
||||
if (numOfTables != 0) {
|
||||
|
|
Loading…
Reference in New Issue