From 1f7f326bedeee288a89c23ecade532d2ce3736d7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 24 May 2023 13:22:05 +0800 Subject: [PATCH] refactor: do some internal refactor and set the api function ptr. --- include/libs/executor/storageapi.h | 134 ++++++-------- include/libs/stream/streamState.h | 2 +- source/dnode/vnode/inc/vnode.h | 30 +-- source/dnode/vnode/src/inc/vnodeInt.h | 4 + source/dnode/vnode/src/meta/metaQuery.c | 144 ++++---------- source/dnode/vnode/src/meta/metaSma.c | 2 +- source/dnode/vnode/src/meta/metaTable.c | 4 +- source/dnode/vnode/src/sma/smaRollup.c | 4 +- source/dnode/vnode/src/tq/tq.c | 5 +- source/dnode/vnode/src/tq/tqRead.c | 8 +- source/dnode/vnode/src/tq/tqScan.c | 2 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 4 +- source/dnode/vnode/src/vnd/vnodeOpen.c | 175 ++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeQuery.c | 42 +++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 + source/libs/executor/inc/querytask.h | 2 +- source/libs/executor/src/executil.c | 9 +- source/libs/executor/src/executor.c | 7 +- source/libs/executor/src/querytask.c | 10 +- source/libs/executor/src/scanoperator.c | 36 ++-- source/libs/executor/src/sysscanoperator.c | 33 ++-- source/libs/executor/src/timewindowoperator.c | 4 +- source/libs/stream/src/streamState.c | 29 +-- 24 files changed, 413 insertions(+), 281 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index c0fd6c34fc..8790c302ce 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -136,8 +136,8 @@ typedef struct SRowBuffPos { // void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); // void metaReaderReleaseLock(SMetaReader *pReader); // void metaReaderClear(SMetaReader *pReader); -// int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); -// int32_t metaGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid); +// int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); +// int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid); // int metaGetTableEntryByName(SMetaReader *pReader, const char *name); // int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList); // int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList); @@ -191,17 +191,13 @@ typedef struct { // int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); // bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid); // bool tqCurrentBlockConsumed(const STqReader* pReader); -// int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); +// int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id); // bool tqNextBlockInWal(STqReader* pReader, const char* idstr); // bool tqNextBlockImpl(STqReader *pReader, const char* idstr); // int32_t getMetafromSnapShot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid); // SMetaTableInfo getUidfromSnapShot(SSnapContext *ctx); // int32_t setForSnapShot(SSnapContext *ctx, int64_t uid); // int32_t destroySnapContext(SSnapContext *ctx); -// SMTbCursor *metaOpenTbCursor(SMeta *pMeta); -// void metaCloseTbCursor(SMTbCursor *pTbCur); -// int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType); -// int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType); /*-------------------------------------------------new api format---------------------------------------------------*/ @@ -229,17 +225,17 @@ typedef struct TsdReader { __store_reader_open_fn_t tsdReaderOpen; void (*tsdReaderClose)(); void (*tsdSetReaderTaskId)(void *pReader, const char *pId); - void (*tsdSetQueryTableList)(); - int32_t (*tsdReaderNextDataBlock)(); + int32_t (*tsdSetQueryTableList)(); + int32_t (*tsdNextDataBlock)(); int32_t (*tsdReaderRetrieveBlockSMAInfo)(); SSDataBlock *(*tsdReaderRetrieveDataBlock)(); void (*tsdReaderReleaseDataBlock)(); - void (*tsdReaderResetStatus)(); - void (*tsdReaderGetDataBlockDistInfo)(); - void (*tsdReaderGetNumOfInMemRows)(); + int32_t (*tsdReaderResetStatus)(); + int32_t (*tsdReaderGetDataBlockDistInfo)(); + int64_t (*tsdReaderGetNumOfInMemRows)(); void (*tsdReaderNotifyClosing)(); } TsdReader; @@ -270,13 +266,13 @@ int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid); bool tqCurrentBlockConsumed(const STqReader* pReader); -int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); +int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id); bool tqNextBlockInWal(STqReader* pReader, const char* idstr); bool tqNextBlockImpl(STqReader *pReader, const char* idstr); int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char* idstr); STqReader *tqReaderOpen(void *pVnode); -void tqCloseReader(STqReader *); +void tqReaderClose(STqReader *); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); @@ -285,7 +281,7 @@ int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas */ // todo rename typedef struct SStoreTqReader { - void *(*tqReaderOpen)(); + struct STqReader* (*tqReaderOpen)(); void (*tqReaderClose)(); int32_t (*tqReaderSeek)(); @@ -294,7 +290,7 @@ typedef struct SStoreTqReader { bool (*tqNextBlockImpl)(); // todo remove it void (*tqReaderSetColIdList)(); - int32_t (*tqReaderSetTargetTableList)(); + int32_t (*tqReaderSetQueryTableList)(); int32_t (*tqReaderAddTables)(); int32_t (*tqReaderRemoveTables)(); @@ -303,10 +299,10 @@ typedef struct SStoreTqReader { bool (*tqReaderCurrentBlockConsumed)(); struct SWalReader *(*tqReaderGetWalReader)(); // todo remove it - void (*tqReaderRetrieveTaosXBlock)(); // todo remove it + int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it int32_t (*tqReaderSetSubmitMsg)(); // todo remove it - void (*tqReaderNextBlockFilterOut)(); + bool (*tqReaderNextBlockFilterOut)(); } SStoreTqReader; typedef struct SStoreSnapshotFn { @@ -326,8 +322,8 @@ typedef struct SStoreSnapshotFn { void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); void metaReaderReleaseLock(SMetaReader *pReader); void metaReaderClear(SMetaReader *pReader); -int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); -int32_t metaGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid); +int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); +int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid); int metaGetTableEntryByName(SMetaReader *pReader, const char *name); int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList); const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal); @@ -347,39 +343,29 @@ int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen); */ - - typedef struct SStoreMetaReader { - void (*initReader)(void *pReader, void *pMeta, int32_t flags); - void *(*clearReader)(); - - void (*readerReleaseLock)(); - - int32_t (*getTableEntryByUid)(); - int32_t (*getTableEntryByName)(); - int32_t (*readerGetEntryGetUidCache)(SMetaReader *pReader, tb_uid_t uid); + void (*initReader)(SMetaReader *pReader, void *pMeta, int32_t flags); + void (*clearReader)(SMetaReader *pReader); + void (*readerReleaseLock)(SMetaReader *pReader); + int32_t (*getTableEntryByUid)(SMetaReader *pReader, tb_uid_t uid); + int32_t (*getTableEntryByName)(SMetaReader *pReader, const char *name); + int32_t (*getEntryGetUidCache)(SMetaReader *pReader, tb_uid_t uid); } SStoreMetaReader; typedef struct SStoreMeta { - /* -SMTbCursor *metaOpenTbCursor(SMeta *pMeta); -void metaCloseTbCursor(SMTbCursor *pTbCur); -int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType); -int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType); - */ - void *(*openMetaCursor)(); - void (*closeMetaCursor)(); - int32_t (*cursorNext)(); - void (*cursorPrev)(); + SMTbCursor *(*openTableMetaCursor)(); // metaOpenTbCursor + void (*closeTableMetaCursor)(); // metaCloseTbCursor + int32_t (*cursorNext)(); // metaTbCursorNext + int32_t (*cursorPrev)(); // metaTbCursorPrev int32_t (*getTableTags)(void *pVnode, uint64_t suid, SArray *uidList); int32_t (*getTableTagsByUid)(); - const char *(*extractTagVal)(const void *tag, int16_t type, STagVal *tagVal); // todo remove it + const void *(*extractTagVal)(const void *tag, int16_t type, STagVal *tagVal); // todo remove it int32_t (*getTableUidByName)(void *pVnode, char *tbName, uint64_t *uid); int32_t (*getTableTypeByName)(void *pVnode, char *tbName, ETableType *tbType); int32_t (*getTableNameByUid)(void *pVnode, uint64_t uid, char *tbName); - bool (*isTableExisted)(void *pVnode, uint64_t uid); + bool (*isTableExisted)(void *pVnode, tb_uid_t uid); /** * int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, @@ -397,8 +383,8 @@ int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, in * */ void *(*storeGetIndexInfo)(); - void *(*storeGetInvertIndex)(); - void (*storeGetChildTableList)(); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] + void *(*getInvertIndex)(void* pVnode); + int32_t (*getChildTableList)(void *pVnode, int64_t suid, SArray *list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] int32_t (*storeGetTableList)(); // vnodeGetStbIdList & vnodeGetAllTableList void *storeGetVersionRange; void *storeGetLastTimestamp; @@ -406,8 +392,8 @@ int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, in int32_t (*getTableSchema)(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); // tsdbGetTableSchema // db name, vgId, numOfTables, numOfSTables - void (*storeGetNumOfChildTables)(); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo); - void (*storeGetBasicInfo)(); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta); + int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo); + void (*getBasicInfo)(void *pVnode, const char **dbname, int32_t *vgId, int64_t* numOfTables, int64_t* numOfNormalTables);// vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta); int64_t (*getNumOfRowsInMem)(); /** @@ -497,50 +483,50 @@ typedef struct SStateStore { int32_t (*streamStateFillGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t (*streamStateFillDel)(SStreamState* pState, const SWinKey* key); - int32_t (*streamStateCurNext)(SStreamState* pState, void* pCur); - int32_t (*streamStateCurPrev)(SStreamState* pState, void* pCur); + int32_t (*streamStateCurNext)(SStreamState* pState, SStreamStateCur* pCur); + int32_t (*streamStateCurPrev)(SStreamState* pState, SStreamStateCur* pCur); - void* (*streamStateGetAndCheckCur)(SStreamState* pState, SWinKey* key); - void* (*streamStateSeekKeyNext)(SStreamState* pState, const SWinKey* key); - void* (*streamStateFillSeekKeyNext)(SStreamState* pState, const SWinKey* key); - void* (*streamStateFillSeekKeyPrev)(SStreamState* pState, const SWinKey* key); - void (*streamStateFreeCur)(void* pCur); + SStreamStateCur* (*streamStateGetAndCheckCur)(SStreamState* pState, SWinKey* key); + SStreamStateCur* (*streamStateSeekKeyNext)(SStreamState* pState, const SWinKey* key); + SStreamStateCur* (*streamStateFillSeekKeyNext)(SStreamState* pState, const SWinKey* key); + SStreamStateCur* (*streamStateFillSeekKeyPrev)(SStreamState* pState, const SWinKey* key); + void (*streamStateFreeCur)(SStreamStateCur* pCur); - int32_t (*streamStateGetGroupKVByCur)(void* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); - int32_t (*streamStateGetKVByCur)(void* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); + int32_t (*streamStateGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); + int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen); int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); int32_t (*streamStateSessionDel)(SStreamState* pState, const SSessionKey* key); int32_t (*streamStateSessionClear)(SStreamState* pState); - int32_t (*streamStateSessionGetKVByCur)(void* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); + int32_t (*streamStateSessionGetKVByCur)(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); int32_t (*streamStateStateAddIfNotExist)(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); - int32_t (*streamStateSessionGetKeyByRange)(void* pState, const SSessionKey* range, SSessionKey* curKey); + int32_t (*streamStateSessionGetKeyByRange)(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey); - void* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark); - TSKEY (*updateInfoFillBlockData)(void *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol); - bool (*updateInfoIsUpdated)(void *pInfo, uint64_t tableId, TSKEY ts); - bool (*updateInfoIsTableInserted)(void *pInfo, int64_t tbUid); - void (*updateInfoDestroy)(void *pInfo); + SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark); + TSKEY (*updateInfoFillBlockData)(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol); + bool (*updateInfoIsUpdated)(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); + bool (*updateInfoIsTableInserted)(SUpdateInfo *pInfo, int64_t tbUid); + void (*updateInfoDestroy)(SUpdateInfo *pInfo); SUpdateInfo* (*updateInfoInitP)(SInterval *pInterval, int64_t watermark); - void (*updateInfoAddCloseWindowSBF)(void *pInfo); - void (*updateInfoDestoryColseWinSBF)(void *pInfo); - int32_t (*updateInfoSerialize)(void *buf, int32_t bufLen, const void *pInfo); - int32_t (*updateInfoDeserialize)(void *buf, int32_t bufLen, void *pInfo); + void (*updateInfoAddCloseWindowSBF)(SUpdateInfo *pInfo); + void (*updateInfoDestoryColseWinSBF)(SUpdateInfo *pInfo); + int32_t (*updateInfoSerialize)(void *buf, int32_t bufLen, const SUpdateInfo *pInfo); + int32_t (*updateInfoDeserialize)(void *buf, int32_t bufLen, SUpdateInfo *pInfo); - void* (*streamStateSessionSeekKeyNext)(SStreamState* pState, const SSessionKey* key); - void* (*streamStateSessionSeekKeyCurrentPrev)(SStreamState* pState, const SSessionKey* key); - void* (*streamStateSessionSeekKeyCurrentNext)(SStreamState* pState, const SSessionKey* key); + SStreamStateCur* (*streamStateSessionSeekKeyNext)(SStreamState* pState, const SSessionKey* key); + SStreamStateCur* (*streamStateSessionSeekKeyCurrentPrev)(SStreamState* pState, const SSessionKey* key); + SStreamStateCur* (*streamStateSessionSeekKeyCurrentNext)(SStreamState* pState, const SSessionKey* key); - void* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, - void* pFile, TSKEY delMark); + struct SStreamFileState* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize, + uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark); - void (*streamFileStateDestroy)(void* pFileState); - void (*streamFileStateClear)(void* pFileState); - bool (*needClearDiskBuff)(void* pFileState); + void (*streamFileStateDestroy)(struct SStreamFileState* pFileState); + void (*streamFileStateClear)(struct SStreamFileState* pFileState); + bool (*needClearDiskBuff)(struct SStreamFileState* pFileState); SStreamState* (*streamStateOpen)(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages); void (*streamStateClose)(SStreamState* pState, bool remove); diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index f9fffef7c2..08357b8251 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -59,7 +59,7 @@ extern "C" { // TXN* txn; //} STdbState; -SStreamState* streamStateOpen(char* path, struct SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); +SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages); void streamStateClose(SStreamState* pState, bool remove); int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index f6d731f2b4..95fe150443 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -67,17 +67,17 @@ int32_t vnodeStart(SVnode *pVnode); void vnodeStop(SVnode *pVnode); int64_t vnodeGetSyncHandle(SVnode *pVnode); void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot); -void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId); +void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t* numOfTables, int64_t* numOfNormalTables); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list); int32_t vnodeIsCatchUp(SVnode *pVnode); ESyncRole vnodeGetRole(SVnode *pVnode); -int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list); +int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list); int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg); int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray *list); -void *vnodeGetIdx(SVnode *pVnode); -void *vnodeGetIvtIdx(SVnode *pVnode); +void *vnodeGetIdx(void *pVnode); +void *vnodeGetIvtIdx(void *pVnode); int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num); int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num); @@ -105,22 +105,22 @@ typedef struct SMetaEntry SMetaEntry; #define META_READER_NOLOCK 0x1 -void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); +void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags); void metaReaderReleaseLock(SMetaReader *pReader); void metaReaderClear(SMetaReader *pReader); -int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); -int32_t metaGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid); +int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); +int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid); int metaGetTableEntryByName(SMetaReader *pReader, const char *name); int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList); int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList); int32_t metaReadNext(SMetaReader *pReader); -const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal); +const void *metaGetTableTagVal(const void *tag, int16_t type, STagVal *tagVal); int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName); int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName); int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid); int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType); -bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid); +bool metaIsTableExist(void* pVnode, tb_uid_t uid); int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList, bool *acquired); int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, @@ -138,7 +138,7 @@ int64_t metaGetNtbNum(SMeta *pMeta); // int64_t uid; // int64_t ctbNum; //} SMetaStbStats; -int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo); +int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables); //typedef struct SMetaFltParam { // tb_uid_t suid; @@ -163,7 +163,7 @@ typedef SVCreateTSmaReq SSmaCfg; typedef struct SMTbCursor SMTbCursor; -SMTbCursor *metaOpenTbCursor(SMeta *pMeta); +SMTbCursor *metaOpenTbCursor(void *pVnode); void metaCloseTbCursor(SMTbCursor *pTbCur); int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType); int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType); @@ -260,16 +260,20 @@ typedef struct STqReader { } STqReader; STqReader *tqReaderOpen(SVnode *pVnode); -void tqCloseReader(STqReader *); +void tqReaderClose(STqReader *); void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); -int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); +bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid); +bool tqCurrentBlockConsumed(const STqReader* pReader); + +int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id); bool tqNextBlockInWal(STqReader *pReader, const char *idstr); bool tqNextBlockImpl(STqReader *pReader, const char *idstr); +SWalReader* tqGetWalReader(STqReader* pReader); int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, const char *id); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 230e8b8f88..735ff1700c 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -161,6 +161,8 @@ void* metaGetIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta); int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList); +void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); + int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); @@ -469,6 +471,8 @@ struct SCompactInfo { STimeWindow tw; }; +void initStorageAPI(SStorageAPI* pAPI); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 48bfc5d86f..40f4b6fa67 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -17,6 +17,11 @@ #include "osMemory.h" #include "tencode.h" +void _metaReaderInit(SMetaReader* pReader, void* pVnode, int32_t flags) { + SMeta* pMeta = ((SVnode*)pVnode)->pMeta; + metaReaderInit(pReader, pMeta, flags); +} + void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) { memset(pReader, 0, sizeof(*pReader)); pReader->flags = flags; @@ -64,96 +69,20 @@ _err: return -1; } -// int metaGetTableEntryByUidTest(void* meta, SArray *uidList) { -// -// SArray* readerList = taosArrayInit(taosArrayGetSize(uidList), sizeof(SMetaReader)); -// SArray* uidVersion = taosArrayInit(taosArrayGetSize(uidList), sizeof(STbDbKey)); -// SMeta *pMeta = meta; -// int64_t version; -// SHashObj *uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); -// -// int64_t stt1 = taosGetTimestampUs(); -// for(int i = 0; i < taosArrayGetSize(uidList); i++) { -// void* ppVal = NULL; -// int vlen = 0; -// uint64_t * uid = taosArrayGet(uidList, i); -// // query uid.idx -// if (tdbTbGet(pMeta->pUidIdx, uid, sizeof(*uid), &ppVal, &vlen) < 0) { -// continue; -// } -// version = *(int64_t *)ppVal; -// -// STbDbKey tbDbKey = {.version = version, .uid = *uid}; -// taosArrayPush(uidVersion, &tbDbKey); -// taosHashPut(uHash, uid, sizeof(int64_t), ppVal, sizeof(int64_t)); -// } -// int64_t stt2 = taosGetTimestampUs(); -// qDebug("metaGetTableEntryByUidTest1 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt2-stt1); -// -// TBC *pCur = NULL; -// tdbTbcOpen(pMeta->pTbDb, &pCur, NULL); -// tdbTbcMoveToFirst(pCur); -// void *pKey = NULL; -// int kLen = 0; -// -// while(1){ -// SMetaReader pReader = {0}; -// int32_t ret = tdbTbcNext(pCur, &pKey, &kLen, &pReader.pBuf, &pReader.szBuf); -// if (ret < 0) break; -// STbDbKey *tmp = (STbDbKey*)pKey; -// int64_t *ver = (int64_t*)taosHashGet(uHash, &tmp->uid, sizeof(int64_t)); -// if(ver == NULL || *ver != tmp->version) continue; -// taosArrayPush(readerList, &pReader); -// } -// tdbTbcClose(pCur); -// -// taosArrayClear(readerList); -// int64_t stt3 = taosGetTimestampUs(); -// qDebug("metaGetTableEntryByUidTest2 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt3-stt2); -// for(int i = 0; i < taosArrayGetSize(uidVersion); i++) { -// SMetaReader pReader = {0}; -// -// STbDbKey *tbDbKey = taosArrayGet(uidVersion, i); -// // query table.db -// if (tdbTbGet(pMeta->pTbDb, tbDbKey, sizeof(STbDbKey), &pReader.pBuf, &pReader.szBuf) < 0) { -// continue; -// } -// taosArrayPush(readerList, &pReader); -// } -// int64_t stt4 = taosGetTimestampUs(); -// qDebug("metaGetTableEntryByUidTest3 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt4-stt3); -// -// for(int i = 0; i < taosArrayGetSize(readerList); i++){ -// SMetaReader* pReader = taosArrayGet(readerList, i); -// metaReaderInit(pReader, meta, 0); -// // decode the entry -// tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf); -// -// if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) { -// } -// metaReaderClear(pReader); -// } -// int64_t stt5 = taosGetTimestampUs(); -// qDebug("metaGetTableEntryByUidTest4 rows:%d, cost:%ld us", taosArrayGetSize(readerList), stt5-stt4); -// return 0; -// } - -bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid) { - // query uid.idx - metaRLock(pMeta); - - if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), NULL, NULL) < 0) { - metaULock(pMeta); +bool metaIsTableExist(void *pVnode, tb_uid_t uid) { + SVnode* pVnodeObj = pVnode; + metaRLock(pVnodeObj->pMeta); // query uid.idx + if (tdbTbGet(pVnodeObj->pMeta->pUidIdx, &uid, sizeof(uid), NULL, NULL) < 0) { + metaULock(pVnodeObj->pMeta); return false; } - metaULock(pMeta); - + metaULock(pVnodeObj->pMeta); return true; } -int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { +int metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { SMeta *pMeta = pReader->pMeta; int64_t version1; @@ -167,7 +96,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { return metaGetTableEntryByVersion(pReader, version1, uid); } -int metaGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid) { +int metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid) { SMeta *pMeta = pReader->pMeta; SMetaInfo info; @@ -190,7 +119,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) { } uid = *(tb_uid_t *)pReader->pBuf; - return metaGetTableEntryByUid(pReader, uid); + return metaReaderGetTableEntryByUid(pReader, uid); } tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) { @@ -214,7 +143,7 @@ int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName) { int code = 0; SMetaReader mr = {0}; metaReaderInit(&mr, (SMeta *)meta, 0); - code = metaGetTableEntryByUid(&mr, uid); + code = metaReaderGetTableEntryByUid(&mr, uid); if (code < 0) { metaReaderClear(&mr); return -1; @@ -230,7 +159,7 @@ int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) { int code = 0; SMetaReader mr = {0}; metaReaderInit(&mr, (SMeta *)meta, 0); - code = metaGetTableEntryByUid(&mr, uid); + code = metaReaderGetTableEntryByUid(&mr, uid); if (code < 0) { metaReaderClear(&mr); return -1; @@ -283,7 +212,7 @@ int metaReadNext(SMetaReader *pReader) { } #if 1 // =================================================== -SMTbCursor *metaOpenTbCursor(SMeta *pMeta) { +SMTbCursor *metaOpenTbCursor(void *pVnode) { SMTbCursor *pTbCur = NULL; pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur)); @@ -291,12 +220,12 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta) { return NULL; } - metaReaderInit(&pTbCur->mr, pMeta, 0); + SVnode* pVnodeObj = pVnode; + metaReaderInit(&pTbCur->mr, pVnodeObj->pMeta, 0); - tdbTbcOpen(pMeta->pUidIdx, (TBC **)&pTbCur->pDbc, NULL); + tdbTbcOpen(pVnodeObj->pMeta->pUidIdx, (TBC **)&pTbCur->pDbc, NULL); tdbTbcMoveToFirst((TBC *)pTbCur->pDbc); - return pTbCur; } @@ -876,7 +805,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) { STSma *pTSma = NULL; for (int i = 0; i < pSW->number; ++i) { smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i); - if (metaGetTableEntryByUid(&mr, smaId) < 0) { + if (metaReaderGetTableEntryByUid(&mr, smaId) < 0) { tDecoderClear(&mr.coder); metaWarn("vgId:%d, no entry for tbId:%" PRIi64 ", smaId:%" PRIi64, TD_VID(pMeta->pVnode), uid, smaId); continue; @@ -926,7 +855,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) { STSma *pTSma = NULL; SMetaReader mr = {0}; metaReaderInit(&mr, pMeta, 0); - if (metaGetTableEntryByUid(&mr, indexUid) < 0) { + if (metaReaderGetTableEntryByUid(&mr, indexUid) < 0) { metaWarn("vgId:%d, failed to get table entry for smaId:%" PRIi64, TD_VID(pMeta->pVnode), indexUid); metaReaderClear(&mr); return NULL; @@ -1027,7 +956,7 @@ SArray *metaGetSmaTbUids(SMeta *pMeta) { #endif -const void *metaGetTableTagVal(void *pTag, int16_t type, STagVal *val) { +const void *metaGetTableTagVal(const void *pTag, int16_t type, STagVal *val) { STag *tag = (STag *)pTag; if (type == TSDB_DATA_TYPE_JSON) { return tag; @@ -1565,30 +1494,35 @@ _exit: return code; } -int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo) { +int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t* numOfTables) { int32_t code = 0; + *numOfTables = 0; - metaRLock(pMeta); + SVnode* pVnodeObj = pVnode; + metaRLock(pVnodeObj->pMeta); // fast path: search cache - if (metaStatsCacheGet(pMeta, uid, pInfo) == TSDB_CODE_SUCCESS) { - metaULock(pMeta); + SMetaStbStats state = {0}; + if (metaStatsCacheGet(pVnodeObj->pMeta, uid, &state) == TSDB_CODE_SUCCESS) { + metaULock(pVnodeObj->pMeta); + *numOfTables = state.ctbNum; goto _exit; } // slow path: search TDB int64_t ctbNum = 0; - vnodeGetCtbNum(pMeta->pVnode, uid, &ctbNum); + vnodeGetCtbNum(pVnode, uid, &ctbNum); - metaULock(pMeta); + metaULock(pVnodeObj->pMeta); + *numOfTables = ctbNum; - pInfo->uid = uid; - pInfo->ctbNum = ctbNum; + state.uid = uid; + state.ctbNum = ctbNum; // upsert the cache - metaWLock(pMeta); - metaStatsCacheUpsert(pMeta, pInfo); - metaULock(pMeta); + metaWLock(pVnodeObj->pMeta); + metaStatsCacheUpsert(pVnodeObj->pMeta, &state); + metaULock(pVnodeObj->pMeta); _exit: return code; diff --git a/source/dnode/vnode/src/meta/metaSma.c b/source/dnode/vnode/src/meta/metaSma.c index 8d5821e28b..3fb491848a 100644 --- a/source/dnode/vnode/src/meta/metaSma.c +++ b/source/dnode/vnode/src/meta/metaSma.c @@ -36,7 +36,7 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) { // validate req // save smaIndex metaReaderInit(&mr, pMeta, 0); - if (metaGetTableEntryByUidCache(&mr, pCfg->indexUid) == 0) { + if (metaReaderGetTableEntryByUidCache(&mr, pCfg->indexUid) == 0) { #if 1 terrno = TSDB_CODE_TSMA_ALREADY_EXIST; metaReaderClear(&mr); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 0164a82c69..df16304e28 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -690,7 +690,7 @@ _err: return -1; } -int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMetaRsp **pMetaRsp) { +int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRsp **pMetaRsp) { SMetaEntry me = {0}; SMetaReader mr = {0}; @@ -729,7 +729,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMe metaReaderClear(&mr); // build SMetaEntry - me.version = version; + me.version = ver; me.type = pReq->type; me.uid = pReq->uid; me.name = pReq->name; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index f66f85cc88..558cfdac2b 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -894,7 +894,7 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) { metaReaderInit(&mr, SMA_META(pSma), 0); smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid); - if (metaGetTableEntryByUidCache(&mr, pInfo->suid) < 0) { + if (metaReaderGetTableEntryByUidCache(&mr, pInfo->suid) < 0) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } @@ -1121,7 +1121,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { for (int64_t i = 0; i < arrSize; ++i) { suid = *(tb_uid_t *)taosArrayGet(suidList, i); smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid); - if (metaGetTableEntryByUidCache(&mr, suid) < 0) { + if (metaReaderGetTableEntryByUidCache(&mr, suid) < 0) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6befccdab2..4adbf02302 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -69,12 +69,12 @@ static void destroyTqHandle(void* data) { if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { taosMemoryFreeClear(pData->execHandle.execCol.qmsg); } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) { - tqCloseReader(pData->execHandle.pTqReader); + tqReaderClose(pData->execHandle.pTqReader); walCloseReader(pData->pWalReader); taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid); } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { walCloseReader(pData->pWalReader); - tqCloseReader(pData->execHandle.pTqReader); + tqReaderClose(pData->execHandle.pTqReader); } if(pData->msg != NULL) { rpcFreeCont(pData->msg->pCont); @@ -787,6 +787,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } SReadHandle handle = {.vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState}; + initStorageAPI(&handle.api); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); if (pTask->exec.pExecutor == NULL) { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 84ead71fb5..2521d956a3 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -273,7 +273,7 @@ STqReader* tqReaderOpen(SVnode* pVnode) { return pReader; } -void tqCloseReader(STqReader* pReader) { +void tqReaderClose(STqReader* pReader) { // close wal reader if (pReader->pWalReader) { walCloseReader(pReader->pWalReader); @@ -294,7 +294,7 @@ void tqCloseReader(STqReader* pReader) { taosMemoryFree(pReader); } -int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) { +int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) { if (walReaderSeekVer(pReader->pWalReader, ver) < 0) { return -1; } @@ -605,6 +605,8 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++); SSDataBlock* pBlock = pReader->pResBlock; + *pRes = pBlock; + blockDataCleanup(pBlock); int32_t sversion = pSubmitTbData->sver; @@ -1084,7 +1086,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { for (int32_t i = 0; i < taosArrayGetSize(tbUidList); ++i) { uint64_t* id = (uint64_t*)taosArrayGet(tbUidList, i); - int32_t code = metaGetTableEntryByUidCache(&mr, *id); + int32_t code = metaReaderGetTableEntryByUidCache(&mr, *id); if (code != TSDB_CODE_SUCCESS) { tqError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno)); continue; diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index e268199e16..98304f9c18 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -51,7 +51,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in metaReaderInit(&mr, pTq->pVnode->pMeta, 0); // TODO add reference to gurantee success - if (metaGetTableEntryByUidCache(&mr, uid) < 0) { + if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) { metaReaderClear(&mr); return -1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index c0a8de5743..2d8b7f9ecf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -980,7 +980,7 @@ static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) { SMetaReader mr = {0}; metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0); - if (metaGetTableEntryByUidCache(&mr, uid) < 0) { + if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) { metaReaderClear(&mr); // table not esist return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index cde2672541..414ccfc8b2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -5376,7 +5376,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) { SMetaReader mr = {0}; metaReaderInit(&mr, pVnode->pMeta, 0); - int32_t code = metaGetTableEntryByUidCache(&mr, uid); + int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid); if (code != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; metaReaderClear(&mr); @@ -5389,7 +5389,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 if (mr.me.type == TSDB_CHILD_TABLE) { tDecoderClear(&mr.coder); *suid = mr.me.ctbEntry.suid; - code = metaGetTableEntryByUidCache(&mr, *suid); + code = metaReaderGetTableEntryByUidCache(&mr, *suid); if (code != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; metaReaderClear(&mr); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index b5e7c6875b..ebfecc4c15 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -14,6 +14,7 @@ */ #include "vnd.h" +#include "tstreamUpdate.h" int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { SVnodeInfo info = {0}; @@ -444,3 +445,177 @@ void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyTerm = pVnode->state.commitTerm; pSnapshot->lastConfigIndex = -1; } + +static void initTsdbReaderAPI(TsdReader* pReader); +static void initMetadataAPI(SStoreMeta* pMeta); +static void initTqAPI(SStoreTqReader* pTq); +static void initStateStoreAPI(SStateStore* pStore); +static void initMetaReaderAPI(SStoreMetaReader* pMetaReader); + +void initStorageAPI(SStorageAPI* pAPI) { + initTsdbReaderAPI(&pAPI->tsdReader); + initMetadataAPI(&pAPI->metaFn); + initTqAPI(&pAPI->tqReaderFn); + initStateStoreAPI(&pAPI->stateStore); + initMetaReaderAPI(&pAPI->metaReaderFn); +} + +void initTsdbReaderAPI(TsdReader* pReader) { + pReader->tsdReaderOpen = (__store_reader_open_fn_t)tsdbReaderOpen; + pReader->tsdReaderClose = tsdbReaderClose; + + pReader->tsdNextDataBlock = tsdbNextDataBlock; + + pReader->tsdReaderRetrieveDataBlock = tsdbRetrieveDataBlock; + pReader->tsdReaderReleaseDataBlock = tsdbReleaseDataBlock; + + pReader->tsdReaderRetrieveBlockSMAInfo = tsdbRetrieveDatablockSMA; + + pReader->tsdReaderNotifyClosing = tsdbReaderSetCloseFlag; + pReader->tsdReaderResetStatus = tsdbReaderReset; + + pReader->tsdReaderGetDataBlockDistInfo = tsdbGetFileBlocksDistInfo; + pReader->tsdReaderGetNumOfInMemRows = tsdbGetNumOfRowsInMemTable; // todo this function should be moved away + + pReader->tsdSetQueryTableList = tsdbSetTableList; + pReader->tsdSetReaderTaskId = (void (*)(void *, const char *))tsdbReaderSetId; +} + +void initMetadataAPI(SStoreMeta* pMeta) { + pMeta->isTableExisted = metaIsTableExist; + + pMeta->openTableMetaCursor = metaOpenTbCursor; + pMeta->closeTableMetaCursor = metaCloseTbCursor; + pMeta->cursorNext = metaTbCursorNext; + pMeta->cursorPrev = metaTbCursorPrev; + + pMeta->getBasicInfo = vnodeGetInfo; + pMeta->getNumOfChildTables = metaGetStbStats; + + pMeta->getChildTableList = vnodeGetCtbIdList; + + pMeta->storeGetIndexInfo = vnodeGetIdx; + pMeta->getInvertIndex = vnodeGetIvtIdx; + + pMeta->extractTagVal = (const void *(*)(const void *, int16_t, STagVal *))metaGetTableTagVal; + +} + +void initTqAPI(SStoreTqReader* pTq) { + pTq->tqReaderOpen = tqReaderOpen; + pTq->tqReaderSetColIdList = tqReaderSetColIdList; + + pTq->tqReaderClose = tqReaderClose; + pTq->tqReaderSeek = tqReaderSeek; + pTq->tqRetrieveBlock = tqRetrieveDataBlock; + + pTq->tqReaderNextBlockInWal = tqNextBlockInWal; + + pTq->tqNextBlockImpl = tqNextBlockImpl;// todo remove it + + pTq->tqReaderAddTables = tqReaderAddTbUidList; + pTq->tqReaderSetQueryTableList = tqReaderSetTbUidList; + + pTq->tqReaderRemoveTables = tqReaderRemoveTbUidList; + + pTq->tqReaderIsQueriedTable = tqReaderIsQueriedTable; + pTq->tqReaderCurrentBlockConsumed = tqCurrentBlockConsumed; + + pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it + pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it + + pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it + pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut; +} + +void initStateStoreAPI(SStateStore* pStore) { + pStore->streamFileStateInit = streamFileStateInit; + pStore->updateInfoDestoryColseWinSBF = updateInfoDestoryColseWinSBF; + + pStore->streamStateGetByPos = streamStateGetByPos; + + pStore->streamStatePutParName = streamStatePutParName; + pStore->streamStateGetParName = streamStateGetParName; + + pStore->streamStateAddIfNotExist = streamStateAddIfNotExist; + pStore->streamStateReleaseBuf = streamStateReleaseBuf; + pStore->streamStateFreeVal = streamStateFreeVal; + + pStore->streamStatePut = streamStatePut; + pStore->streamStateGet = streamStateGet; + pStore->streamStateCheck = streamStateCheck; + pStore->streamStateGetByPos = streamStateGetByPos; + pStore->streamStateDel = streamStateDel; + pStore->streamStateClear = streamStateClear; + pStore->streamStateSaveInfo = streamStateSaveInfo; + pStore->streamStateGetInfo = streamStateGetInfo; + pStore->streamStateSetNumber = streamStateSetNumber; + + pStore->streamStateFillPut = streamStateFillPut; + pStore->streamStateFillGet = streamStateFillGet; + pStore->streamStateFillDel = streamStateFillDel; + + pStore->streamStateCurNext = streamStateCurNext; + pStore->streamStateCurPrev = streamStateCurPrev; + + pStore->streamStateGetAndCheckCur = streamStateGetAndCheckCur; + pStore->streamStateSeekKeyNext = streamStateSeekKeyNext; + pStore->streamStateFillSeekKeyNext = streamStateFillSeekKeyNext; + pStore->streamStateFillSeekKeyPrev = streamStateFillSeekKeyPrev; + pStore->streamStateFreeCur = streamStateFreeCur; + + pStore->streamStateGetGroupKVByCur = streamStateGetGroupKVByCur; + pStore->streamStateGetKVByCur = streamStateGetKVByCur; + + pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist; + pStore->streamStateSessionPut = streamStateSessionPut; + pStore->streamStateSessionGet = streamStateSessionGet; + pStore->streamStateSessionDel = streamStateSessionDel; + pStore->streamStateSessionClear = streamStateSessionClear; + pStore->streamStateSessionGetKVByCur = streamStateSessionGetKVByCur; + pStore->streamStateStateAddIfNotExist = streamStateStateAddIfNotExist; + pStore->streamStateSessionGetKeyByRange = streamStateSessionGetKeyByRange; + + pStore->updateInfoInit = updateInfoInit; + pStore->updateInfoFillBlockData = updateInfoFillBlockData; + pStore->updateInfoIsUpdated = updateInfoIsUpdated; + pStore->updateInfoIsTableInserted = updateInfoIsTableInserted; + pStore->updateInfoDestroy = updateInfoDestroy; + + pStore->updateInfoInitP = updateInfoInitP; + pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF; + pStore->updateInfoDestoryColseWinSBF = updateInfoDestoryColseWinSBF; + pStore->updateInfoSerialize = updateInfoSerialize; + pStore->updateInfoDeserialize = updateInfoDeserialize; + + pStore->streamStateSessionSeekKeyNext = streamStateSessionSeekKeyNext; + pStore->streamStateSessionSeekKeyCurrentPrev = streamStateSessionSeekKeyCurrentPrev; + pStore->streamStateSessionSeekKeyCurrentNext = streamStateSessionSeekKeyCurrentNext; + + pStore->streamFileStateInit = streamFileStateInit; + + pStore->streamFileStateDestroy = streamFileStateDestroy; + pStore->streamFileStateClear = streamFileStateClear; + pStore->needClearDiskBuff = needClearDiskBuff; + + pStore->streamStateOpen = streamStateOpen; + pStore->streamStateClose = streamStateClose; + pStore->streamStateBegin = streamStateBegin; + pStore->streamStateCommit = streamStateCommit; + pStore->streamStateDestroy= streamStateDestroy; + pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; +} + +void initMetaReaderAPI(SStoreMetaReader* pMetaReader) { + pMetaReader->initReader = _metaReaderInit; + pMetaReader->clearReader = metaReaderClear; + + pMetaReader->getTableEntryByUid = metaReaderGetTableEntryByUid; + pMetaReader->clearReader = metaReaderClear; + + pMetaReader->getEntryGetUidCache = metaReaderGetTableEntryByUidCache; + pMetaReader->getTableEntryByName = metaGetTableEntryByName; + + pMetaReader->readerReleaseLock = metaReaderReleaseLock; +} + diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 303d2a9ca4..ac3e632172 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -80,7 +80,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { metaRsp.suid = mer1.me.uid; } else if (mer1.me.type == TSDB_CHILD_TABLE) { metaReaderInit(&mer2, pVnode->pMeta, META_READER_NOLOCK); - if (metaGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit; + if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit; strcpy(metaRsp.stbName, mer2.me.name); metaRsp.suid = mer2.me.uid; @@ -189,7 +189,7 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { goto _exit; } else if (mer1.me.type == TSDB_CHILD_TABLE) { metaReaderInit(&mer2, pVnode->pMeta, 0); - if (metaGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit; + if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit; strcpy(cfgRsp.stbName, mer2.me.name); schema = mer2.me.stbEntry.schemaRow; @@ -410,13 +410,24 @@ void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { "nBatchInsertSuccess"); } -void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) { +void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t* numOfTables, int64_t* numOfNormalTables) { + SVnode* pVnodeObj = pVnode; + SVnodeCfg* pConf = &pVnodeObj->config; + if (dbname) { - *dbname = pVnode->config.dbname; + *dbname = pConf->dbname; } if (vgId) { - *vgId = TD_VID(pVnode); + *vgId = TD_VID(pVnodeObj); + } + + if (numOfTables) { + *numOfTables = pConf->vndStats.numOfNTables + pConf->vndStats.numOfCTables; + } + + if (numOfNormalTables) { + *numOfNormalTables = pConf->vndStats.numOfNTables; } } @@ -440,8 +451,10 @@ int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) { int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg) { return 0; } -int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list) { - SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid, 1); + +int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list) { + SVnode *pVnodeObj = pVnode; + SMCtbCursor *pCur = metaOpenCtbCursor(pVnodeObj->pMeta, suid, 1); while (1) { tb_uid_t id = metaCtbCursorNext(pCur); @@ -529,10 +542,8 @@ int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num) { for (int64_t i = 0; i < arrSize; ++i) { tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i); - SMetaStbStats stats = {0}; - metaGetStbStats(pVnode->pMeta, suid, &stats); - int64_t ctbNum = stats.ctbNum; - // vnodeGetCtbNum(pVnode, id, &ctbNum); + int64_t ctbNum = 0; + metaGetStbStats(pVnode, suid, &ctbNum); int numOfCols = 0; vnodeGetStbColumnNum(pVnode, suid, &numOfCols); @@ -567,16 +578,17 @@ int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num) { return TSDB_CODE_SUCCESS; } -void *vnodeGetIdx(SVnode *pVnode) { +void *vnodeGetIdx(void *pVnode) { if (pVnode == NULL) { return NULL; } - return metaGetIdx(pVnode->pMeta); + + return metaGetIdx(((SVnode*)pVnode)->pMeta); } -void *vnodeGetIvtIdx(SVnode *pVnode) { +void *vnodeGetIvtIdx(void *pVnode) { if (pVnode == NULL) { return NULL; } - return metaGetIvtIdx(pVnode->pMeta); + return metaGetIvtIdx(((SVnode*)pVnode)->pMeta); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 268d51cd16..d16102037d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -515,6 +515,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { } SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; + initStorageAPI(&handle.api); + switch (pMsg->msgType) { case TDMT_SCH_QUERY: case TDMT_SCH_MERGE_QUERY: diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 1c2ed6c076..6497bd90b4 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -95,7 +95,7 @@ struct SExecTaskInfo { }; void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst); -SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model); +SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI); void doDestroyTask(SExecTaskInfo* pTaskInfo); bool isTaskKilled(SExecTaskInfo* pTaskInfo); void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index b24fcade79..a52e73eb49 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -307,7 +307,7 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, SMetaReader mr = {0}; pAPI->metaReaderFn.initReader(&mr, metaHandle, 0); - code = pAPI->metaReaderFn.readerGetEntryGetUidCache(&mr, info->uid); + code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, info->uid); if (TSDB_CODE_SUCCESS != code) { pAPI->metaReaderFn.clearReader(&mr); *pQualified = false; @@ -1090,12 +1090,11 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S } if (!pTagCond) { // no tag filter condition exists, let's fetch all tables of this super table -// vnodeGetCtbIdList(); - pStorageAPI->metaFn.storeGetChildTableList(pVnode, pScanNode->suid, pUidList); + pStorageAPI->metaFn.getChildTableList(pVnode, pScanNode->suid, pUidList); } else { // failed to find the result in the cache, let try to calculate the results if (pTagIndexCond) { - void* pIndex = pStorageAPI->metaFn.storeGetInvertIndex(pVnode); + void* pIndex = pStorageAPI->metaFn.getInvertIndex(pVnode); SIndexMetaArg metaArg = { .metaEx = pVnode, .idx = pStorageAPI->metaFn.storeGetIndexInfo(pVnode), .ivtIdx = pIndex, .suid = pScanNode->uid}; @@ -1170,7 +1169,7 @@ int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, SMetaReader mr = {0}; pAPI->metaReaderFn.initReader(&mr, pVnode, 0); - if (pAPI->metaReaderFn.readerGetEntryGetUidCache(&mr, uid) != 0) { // table not exist + if (pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid) != 0) { // table not exist pAPI->metaReaderFn.clearReader(&mr); return TSDB_CODE_PAR_TABLE_NOT_EXIST; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index cad98348d0..78a015269a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -252,7 +252,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id) { if (msg == NULL) { // create raw scan - SExecTaskInfo* pTaskInfo = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE); + SExecTaskInfo* pTaskInfo = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api); if (NULL == pTaskInfo) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -264,6 +264,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 return NULL; } + pTaskInfo->storageAPI = pReaderHandle->api; qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo)); return pTaskInfo; } @@ -1092,8 +1093,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ASSERT(0); // walReaderVerifyOffset(pInfo->tqReader->pWalReader, pOffset); -// if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) { -// qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id); +// if (tqReaderSeek(pInfo->tqReader, pOffset->version + 1, id) < 0) { +// qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version + 1, id); // return -1; // } } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index a684a6bf94..f79966a021 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -35,7 +35,7 @@ #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) -SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model) { +SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI) { SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); if (pTaskInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -48,6 +48,7 @@ SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOP pTaskInfo->execModel = model; pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo)); pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES); + pTaskInfo->storageAPI = *pAPI; taosInitRWLatch(&pTaskInfo->lock); @@ -55,7 +56,6 @@ SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOP pTaskInfo->id.queryId = queryId; pTaskInfo->id.str = taosMemoryMalloc(64); buildTaskId(taskId, queryId, pTaskInfo->id.str); - return pTaskInfo; } @@ -78,7 +78,7 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) { - *pTaskInfo = doCreateTask(pPlan->id.queryId, taskId, vgId, model); + *pTaskInfo = doCreateTask(pPlan->id.queryId, taskId, vgId, model, &pHandle->api); if (*pTaskInfo == NULL) { taosMemoryFree(sql); return terrno; @@ -123,7 +123,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo SStorageAPI* pAPI = &pTaskInfo->storageAPI; pAPI->metaReaderFn.initReader(&mr, pHandle->vnode, 0); - int32_t code = pAPI->metaReaderFn.readerGetEntryGetUidCache(&mr, pScanNode->uid); + int32_t code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, pScanNode->uid); if (code != TSDB_CODE_SUCCESS) { qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid, GET_TASKID(pTaskInfo)); @@ -144,7 +144,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo tDecoderClear(&mr.coder); tb_uid_t suid = mr.me.ctbEntry.suid; - code = pAPI->metaReaderFn.readerGetEntryGetUidCache(&mr, suid); + code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, suid); if (code != TSDB_CODE_SUCCESS) { pAPI->metaReaderFn.clearReader(&mr); return terrno; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 23e029e780..83e62faf90 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -532,7 +532,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int // 1. check if it is existed in meta cache if (pCache == NULL) { pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0); - code = pHandle->api.metaReaderFn.readerGetEntryGetUidCache(&mr, pBlock->info.id.uid); + code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid); if (code != TSDB_CODE_SUCCESS) { // when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed. if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) { @@ -561,7 +561,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); if (h == NULL) { pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0); - code = pHandle->api.metaReaderFn.readerGetEntryGetUidCache(&mr, pBlock->info.id.uid); + code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid); if (code != TSDB_CODE_SUCCESS) { if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) { qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", @@ -693,7 +693,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { int64_t st = taosGetTimestampUs(); while (true) { - code = pAPI->tsdReader.tsdReaderNextDataBlock(pTableScanInfo->base.dataReader, &hasNext); + code = pAPI->tsdReader.tsdNextDataBlock(pTableScanInfo->base.dataReader, &hasNext); if (code) { pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader); T_LONG_JMP(pTaskInfo->env, code); @@ -975,6 +975,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); + pInfo->readerAPI = pTaskInfo->storageAPI.tsdReader; initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); // blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); @@ -1103,7 +1104,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU } bool hasNext = false; - code = pAPI->tsdReader.tsdReaderNextDataBlock(pReader, &hasNext); + code = pAPI->tsdReader.tsdNextDataBlock(pReader, &hasNext); if (code != TSDB_CODE_SUCCESS) { terrno = code; T_LONG_JMP(pTaskInfo->env, code); @@ -2162,7 +2163,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { bool hasNext = false; if (pInfo->dataReader) { - code = pAPI->tsdReader.tsdReaderNextDataBlock(pInfo->dataReader, &hasNext); + code = pAPI->tsdReader.tsdNextDataBlock(pInfo->dataReader, &hasNext); if (code) { pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader); T_LONG_JMP(pTaskInfo->env, code); @@ -2416,7 +2417,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys // set the extract column id to streamHandle pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds); SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo); - code = pAPI->tqReaderFn.tqReaderSetTargetTableList(pInfo->tqReader, tableIdList); + code = pAPI->tqReaderFn.tqReaderSetQueryTableList(pInfo->tqReader, tableIdList); if (code != 0) { taosArrayDestroy(tableIdList); goto _error; @@ -2457,6 +2458,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->igExpired = pTableScanNode->igExpired; pInfo->twAggSup.maxTs = INT64_MIN; pInfo->pState = NULL; + pInfo->stateStore = pTaskInfo->storageAPI.stateStore; + pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn; // for stream if (pTaskInfo->streamInfo.pState) { @@ -2674,7 +2677,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { qTrace("tsdb/read-table-data: %p, enter next reader", reader); while (true) { - code = pAPI->tsdReader.tsdReaderNextDataBlock(reader, &hasNext); + code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); if (code != 0) { pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); pInfo->base.dataReader = NULL; @@ -3354,7 +3357,7 @@ static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCount SStorageAPI* pAPI = &pTaskInfo->storageAPI; // get dbname - pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode, &db, &vgId); + pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL); SName sn = {0}; tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB); tNameGetDbName(&sn, dbName); @@ -3396,7 +3399,7 @@ static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountSca pRes->info.id.groupId = groupId; int64_t dbTableCount = 0; - pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode, &dbTableCount); + pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &dbTableCount, NULL); fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes); setOperatorCompleted(pOperator); } @@ -3411,20 +3414,21 @@ static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanO if (strlen(pSupp->stbNameFilter) != 0) { tb_uid_t uid = 0; pAPI->metaFn.getTableUidByName(pInfo->readHandle.vnode, pSupp->stbNameFilter, &uid); - SMetaStbStats stats = {0}; - ASSERT(0); -// metaGetStbStats(pInfo->readHandle.vnode, uid, &stats); - int64_t ctbNum = stats.ctbNum; - fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, ctbNum, pRes); + + int64_t numOfChildTables = 0; + pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, uid, &numOfChildTables); + + fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, numOfChildTables, pRes); } else { int64_t tbNumVnode = 0;//metaGetTbNum(pInfo->readHandle.vnode); fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes); } } else { - int64_t tbNumVnode = 0;//metaGetTbNum(pInfo->readHandle.vnode); - pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode); + int64_t tbNumVnode = 0; + pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &tbNumVnode, NULL); fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes); } + setOperatorCompleted(pOperator); } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index a9450fb183..16f2b64c07 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -157,7 +157,7 @@ int32_t sysFilte__DbName(void* arg, SNode* pNode, SArray* result) { const char* db = NULL; ASSERT(0); -// pAPI->metaFn.storeGetBasicInfo(pVnode, &db, NULL); +// pAPI->metaFn.getBasicInfo(pVnode, &db, NULL); SName sn = {0}; char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; @@ -185,7 +185,7 @@ int32_t sysFilte__VgroupId(void* arg, SNode* pNode, SArray* result) { int64_t vgId = 0; ASSERT(0); -// pAPI->metaFn.storeGetBasicInfo(pVnode, NULL, (int32_t*)&vgId); +// pAPI->metaFn.getBasicInfo(pVnode, NULL, (int32_t*)&vgId); SOperatorNode* pOper = (SOperatorNode*)pNode; SValueNode* pVal = (SValueNode*)pOper->pRight; @@ -451,7 +451,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { const char* db = NULL; int32_t vgId = 0; - pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode, &db, &vgId); + pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL); SName sn = {0}; char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; @@ -522,7 +522,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { int32_t ret = 0; if (pInfo->pCur == NULL) { - pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.vnode); + pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode); } if (pInfo->pSchema == NULL) { @@ -614,7 +614,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { blockDataDestroy(dataBlock); if (ret != 0) { - pAPI->metaFn.closeMetaCursor(pInfo->pCur); + pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pInfo->pCur = NULL; setOperatorCompleted(pOperator); } @@ -642,7 +642,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { const char* db = NULL; int32_t vgId = 0; - pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode, &db, &vgId); + pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL); SName sn = {0}; char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; @@ -702,7 +702,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { int32_t ret = 0; if (pInfo->pCur == NULL) { - pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.vnode); + pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode); } bool blockFull = false; @@ -722,7 +722,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno), GET_TASKID(pTaskInfo)); pAPI->metaReaderFn.clearReader(&smrSuperTable); - pAPI->metaFn.closeMetaCursor(pInfo->pCur); + pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pInfo->pCur = NULL; T_LONG_JMP(pTaskInfo->env, terrno); } @@ -756,7 +756,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { blockDataDestroy(dataBlock); if (ret != 0) { - pAPI->metaFn.closeMetaCursor(pInfo->pCur); + pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pInfo->pCur = NULL; setOperatorCompleted(pOperator); } @@ -1113,7 +1113,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { const char* db = NULL; int32_t vgId = 0; - pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode, &db, &vgId); + pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL); SName sn = {0}; char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; @@ -1292,7 +1292,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { SSysTableScanInfo* pInfo = pOperator->info; if (pInfo->pCur == NULL) { - pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.vnode); + pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode); } blockDataCleanup(pInfo->pRes); @@ -1300,7 +1300,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { const char* db = NULL; int32_t vgId = 0; - pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode, &db, &vgId); + pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL); SName sn = {0}; char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; @@ -1346,7 +1346,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { qError("failed to get super table meta, cname:%s, suid:0x%" PRIx64 ", code:%s, %s", pInfo->pCur->mr.me.name, suid, tstrerror(terrno), GET_TASKID(pTaskInfo)); pAPI->metaReaderFn.clearReader(&mr); - pAPI->metaFn.closeMetaCursor(pInfo->pCur); + pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pInfo->pCur = NULL; T_LONG_JMP(pTaskInfo->env, terrno); } @@ -1456,8 +1456,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found if (ret != 0) { - pAPI->metaFn.closeMetaCursor(pInfo->pCur); - pAPI->metaFn.closeMetaCursor(pInfo->pCur); + pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pInfo->pCur = NULL; setOperatorCompleted(pOperator); } @@ -1741,6 +1740,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan extractTbnameSlotId(pInfo, pScanNode); + pInfo->pAPI = &pTaskInfo->storageAPI; + pInfo->accountId = pScanPhyNode->accountId; pInfo->pUser = taosStrdup((void*)pUser); pInfo->sysInfo = pScanPhyNode->sysInfo; @@ -1813,7 +1814,7 @@ void destroySysScanOperator(void* param) { if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) { - pInfo->pAPI->metaFn.closeMetaCursor(pInfo->pCur); + pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pInfo->pCur = NULL; } if (pInfo->pIdx) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b4fd571e23..2d366b6110 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2804,6 +2804,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); pInfo->dataVersion = 0; + pInfo->statestore = pTaskInfo->storageAPI.stateStore; pOperator->operatorType = pPhyNode->type; pOperator->blocking = true; @@ -4938,7 +4939,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; initResultSizeInfo(&pOperator->resultInfo, 4096); - pInfo->pState = taosMemoryCalloc(1, sizeof(void)); + pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); @@ -4986,6 +4987,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); + pInfo->statestore = pTaskInfo->storageAPI.stateStore; initIntervalDownStream(downstream, pPhyNode->type, pInfo); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 98b685d8b9..24339337b7 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -91,13 +91,14 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { return winKeyCmprImpl(&pWin1->key, &pWin2->key); } -SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) { - qWarn("open stream state, %s", path); +SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) { + qDebug("open stream state, %s", path); SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); if (pState == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState)); if (pState->pTdbState == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -105,29 +106,33 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int return NULL; } + SStreamTask* pStreamTask = pTask; char statePath[1024]; if (!specPath) { - sprintf(statePath, "%s/%d", path, pTask->id.taskId); + sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId); } else { memset(statePath, 0, 1024); tstrncpy(statePath, path, 1024); } - pState->taskId = pTask->id.taskId; - pState->streamId = pTask->id.streamId; + + pState->taskId = pStreamTask->id.taskId; + pState->streamId = pStreamTask->id.streamId; + #ifdef USE_ROCKSDB - // qWarn("open stream state1"); - taosAcquireRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); - int code = streamStateOpenBackend(pTask->pMeta->streamBackend, pState); + SStreamMeta* pMeta = pStreamTask->pMeta; + taosAcquireRef(pMeta->streamBackendId, pMeta->streamBackendRid); + int code = streamStateOpenBackend(pMeta->streamBackend, pState); if (code == -1) { - taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); + taosReleaseRef(pMeta->streamBackendId, pMeta->streamBackendRid); taosMemoryFree(pState); pState = NULL; } + pState->pTdbState->pOwner = pTask; pState->pFileState = NULL; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT); - pState->parNameMap = tSimpleHashInit(1024, hashFn); + pState->parNameMap = tSimpleHashInit(1024, hashFn); return pState; #else @@ -449,7 +454,7 @@ int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pV #ifdef USE_ROCKSDB taosMemoryFree(pVal); #else - streamFreeVal(pVal); + streamStateFreeVal(pVal); #endif return 0; } @@ -700,7 +705,7 @@ void streamStateFreeCur(SStreamStateCur* pCur) { taosMemoryFree(pCur); } -void streamFreeVal(void* val) { +void streamStateFreeVal(void* val) { #ifdef USE_ROCKSDB taosMemoryFree(val); #else