refactor: do some internal refactor and set the api function ptr.
This commit is contained in:
parent
39a1fa8f78
commit
1f7f326bed
|
@ -136,8 +136,8 @@ typedef struct SRowBuffPos {
|
|||
// void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
|
||||
// void metaReaderReleaseLock(SMetaReader *pReader);
|
||||
// void metaReaderClear(SMetaReader *pReader);
|
||||
// int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||
// int32_t metaGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
|
||||
// int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||
// int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
|
||||
// int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
|
||||
// int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList);
|
||||
// int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList);
|
||||
|
@ -191,17 +191,13 @@ typedef struct {
|
|||
// int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
|
||||
// bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid);
|
||||
// bool tqCurrentBlockConsumed(const STqReader* pReader);
|
||||
// int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
|
||||
// int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
|
||||
// bool tqNextBlockInWal(STqReader* pReader, const char* idstr);
|
||||
// bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
|
||||
// int32_t getMetafromSnapShot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
|
||||
// SMetaTableInfo getUidfromSnapShot(SSnapContext *ctx);
|
||||
// int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
|
||||
// int32_t destroySnapContext(SSnapContext *ctx);
|
||||
// SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
||||
// void metaCloseTbCursor(SMTbCursor *pTbCur);
|
||||
// int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
|
||||
// int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
|
||||
|
||||
/*-------------------------------------------------new api format---------------------------------------------------*/
|
||||
|
||||
|
@ -229,17 +225,17 @@ typedef struct TsdReader {
|
|||
__store_reader_open_fn_t tsdReaderOpen;
|
||||
void (*tsdReaderClose)();
|
||||
void (*tsdSetReaderTaskId)(void *pReader, const char *pId);
|
||||
void (*tsdSetQueryTableList)();
|
||||
int32_t (*tsdReaderNextDataBlock)();
|
||||
int32_t (*tsdSetQueryTableList)();
|
||||
int32_t (*tsdNextDataBlock)();
|
||||
|
||||
int32_t (*tsdReaderRetrieveBlockSMAInfo)();
|
||||
SSDataBlock *(*tsdReaderRetrieveDataBlock)();
|
||||
|
||||
void (*tsdReaderReleaseDataBlock)();
|
||||
|
||||
void (*tsdReaderResetStatus)();
|
||||
void (*tsdReaderGetDataBlockDistInfo)();
|
||||
void (*tsdReaderGetNumOfInMemRows)();
|
||||
int32_t (*tsdReaderResetStatus)();
|
||||
int32_t (*tsdReaderGetDataBlockDistInfo)();
|
||||
int64_t (*tsdReaderGetNumOfInMemRows)();
|
||||
void (*tsdReaderNotifyClosing)();
|
||||
} TsdReader;
|
||||
|
||||
|
@ -270,13 +266,13 @@ int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
|
|||
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid);
|
||||
bool tqCurrentBlockConsumed(const STqReader* pReader);
|
||||
|
||||
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
|
||||
int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
|
||||
bool tqNextBlockInWal(STqReader* pReader, const char* idstr);
|
||||
bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
|
||||
|
||||
int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char* idstr);
|
||||
STqReader *tqReaderOpen(void *pVnode);
|
||||
void tqCloseReader(STqReader *);
|
||||
void tqReaderClose(STqReader *);
|
||||
|
||||
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
||||
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
||||
|
@ -285,7 +281,7 @@ int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas
|
|||
*/
|
||||
// todo rename
|
||||
typedef struct SStoreTqReader {
|
||||
void *(*tqReaderOpen)();
|
||||
struct STqReader* (*tqReaderOpen)();
|
||||
void (*tqReaderClose)();
|
||||
|
||||
int32_t (*tqReaderSeek)();
|
||||
|
@ -294,7 +290,7 @@ typedef struct SStoreTqReader {
|
|||
bool (*tqNextBlockImpl)(); // todo remove it
|
||||
|
||||
void (*tqReaderSetColIdList)();
|
||||
int32_t (*tqReaderSetTargetTableList)();
|
||||
int32_t (*tqReaderSetQueryTableList)();
|
||||
|
||||
int32_t (*tqReaderAddTables)();
|
||||
int32_t (*tqReaderRemoveTables)();
|
||||
|
@ -303,10 +299,10 @@ typedef struct SStoreTqReader {
|
|||
bool (*tqReaderCurrentBlockConsumed)();
|
||||
|
||||
struct SWalReader *(*tqReaderGetWalReader)(); // todo remove it
|
||||
void (*tqReaderRetrieveTaosXBlock)(); // todo remove it
|
||||
int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it
|
||||
|
||||
int32_t (*tqReaderSetSubmitMsg)(); // todo remove it
|
||||
void (*tqReaderNextBlockFilterOut)();
|
||||
bool (*tqReaderNextBlockFilterOut)();
|
||||
} SStoreTqReader;
|
||||
|
||||
typedef struct SStoreSnapshotFn {
|
||||
|
@ -326,8 +322,8 @@ typedef struct SStoreSnapshotFn {
|
|||
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
|
||||
void metaReaderReleaseLock(SMetaReader *pReader);
|
||||
void metaReaderClear(SMetaReader *pReader);
|
||||
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||
int32_t metaGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
|
||||
int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||
int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
|
||||
int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
|
||||
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList);
|
||||
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
|
||||
|
@ -347,39 +343,29 @@ int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey,
|
|||
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
|
||||
int32_t payloadLen);
|
||||
*/
|
||||
|
||||
|
||||
typedef struct SStoreMetaReader {
|
||||
void (*initReader)(void *pReader, void *pMeta, int32_t flags);
|
||||
void *(*clearReader)();
|
||||
|
||||
void (*readerReleaseLock)();
|
||||
|
||||
int32_t (*getTableEntryByUid)();
|
||||
int32_t (*getTableEntryByName)();
|
||||
int32_t (*readerGetEntryGetUidCache)(SMetaReader *pReader, tb_uid_t uid);
|
||||
void (*initReader)(SMetaReader *pReader, void *pMeta, int32_t flags);
|
||||
void (*clearReader)(SMetaReader *pReader);
|
||||
void (*readerReleaseLock)(SMetaReader *pReader);
|
||||
int32_t (*getTableEntryByUid)(SMetaReader *pReader, tb_uid_t uid);
|
||||
int32_t (*getTableEntryByName)(SMetaReader *pReader, const char *name);
|
||||
int32_t (*getEntryGetUidCache)(SMetaReader *pReader, tb_uid_t uid);
|
||||
} SStoreMetaReader;
|
||||
|
||||
typedef struct SStoreMeta {
|
||||
/*
|
||||
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
||||
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
||||
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
|
||||
int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
|
||||
*/
|
||||
void *(*openMetaCursor)();
|
||||
void (*closeMetaCursor)();
|
||||
int32_t (*cursorNext)();
|
||||
void (*cursorPrev)();
|
||||
SMTbCursor *(*openTableMetaCursor)(); // metaOpenTbCursor
|
||||
void (*closeTableMetaCursor)(); // metaCloseTbCursor
|
||||
int32_t (*cursorNext)(); // metaTbCursorNext
|
||||
int32_t (*cursorPrev)(); // metaTbCursorPrev
|
||||
|
||||
int32_t (*getTableTags)(void *pVnode, uint64_t suid, SArray *uidList);
|
||||
int32_t (*getTableTagsByUid)();
|
||||
const char *(*extractTagVal)(const void *tag, int16_t type, STagVal *tagVal); // todo remove it
|
||||
const void *(*extractTagVal)(const void *tag, int16_t type, STagVal *tagVal); // todo remove it
|
||||
|
||||
int32_t (*getTableUidByName)(void *pVnode, char *tbName, uint64_t *uid);
|
||||
int32_t (*getTableTypeByName)(void *pVnode, char *tbName, ETableType *tbType);
|
||||
int32_t (*getTableNameByUid)(void *pVnode, uint64_t uid, char *tbName);
|
||||
bool (*isTableExisted)(void *pVnode, uint64_t uid);
|
||||
bool (*isTableExisted)(void *pVnode, tb_uid_t uid);
|
||||
|
||||
/**
|
||||
* int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
|
||||
|
@ -397,8 +383,8 @@ int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, in
|
|||
*
|
||||
*/
|
||||
void *(*storeGetIndexInfo)();
|
||||
void *(*storeGetInvertIndex)();
|
||||
void (*storeGetChildTableList)(); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
|
||||
void *(*getInvertIndex)(void* pVnode);
|
||||
int32_t (*getChildTableList)(void *pVnode, int64_t suid, SArray *list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
|
||||
int32_t (*storeGetTableList)(); // vnodeGetStbIdList & vnodeGetAllTableList
|
||||
void *storeGetVersionRange;
|
||||
void *storeGetLastTimestamp;
|
||||
|
@ -406,8 +392,8 @@ int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, in
|
|||
int32_t (*getTableSchema)(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); // tsdbGetTableSchema
|
||||
|
||||
// db name, vgId, numOfTables, numOfSTables
|
||||
void (*storeGetNumOfChildTables)(); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
|
||||
void (*storeGetBasicInfo)(); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
|
||||
int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
|
||||
void (*getBasicInfo)(void *pVnode, const char **dbname, int32_t *vgId, int64_t* numOfTables, int64_t* numOfNormalTables);// vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
|
||||
|
||||
int64_t (*getNumOfRowsInMem)();
|
||||
/**
|
||||
|
@ -497,50 +483,50 @@ typedef struct SStateStore {
|
|||
int32_t (*streamStateFillGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
|
||||
int32_t (*streamStateFillDel)(SStreamState* pState, const SWinKey* key);
|
||||
|
||||
int32_t (*streamStateCurNext)(SStreamState* pState, void* pCur);
|
||||
int32_t (*streamStateCurPrev)(SStreamState* pState, void* pCur);
|
||||
int32_t (*streamStateCurNext)(SStreamState* pState, SStreamStateCur* pCur);
|
||||
int32_t (*streamStateCurPrev)(SStreamState* pState, SStreamStateCur* pCur);
|
||||
|
||||
void* (*streamStateGetAndCheckCur)(SStreamState* pState, SWinKey* key);
|
||||
void* (*streamStateSeekKeyNext)(SStreamState* pState, const SWinKey* key);
|
||||
void* (*streamStateFillSeekKeyNext)(SStreamState* pState, const SWinKey* key);
|
||||
void* (*streamStateFillSeekKeyPrev)(SStreamState* pState, const SWinKey* key);
|
||||
void (*streamStateFreeCur)(void* pCur);
|
||||
SStreamStateCur* (*streamStateGetAndCheckCur)(SStreamState* pState, SWinKey* key);
|
||||
SStreamStateCur* (*streamStateSeekKeyNext)(SStreamState* pState, const SWinKey* key);
|
||||
SStreamStateCur* (*streamStateFillSeekKeyNext)(SStreamState* pState, const SWinKey* key);
|
||||
SStreamStateCur* (*streamStateFillSeekKeyPrev)(SStreamState* pState, const SWinKey* key);
|
||||
void (*streamStateFreeCur)(SStreamStateCur* pCur);
|
||||
|
||||
int32_t (*streamStateGetGroupKVByCur)(void* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
|
||||
int32_t (*streamStateGetKVByCur)(void* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
|
||||
int32_t (*streamStateGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
|
||||
int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
|
||||
|
||||
int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen);
|
||||
int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
|
||||
int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
|
||||
int32_t (*streamStateSessionDel)(SStreamState* pState, const SSessionKey* key);
|
||||
int32_t (*streamStateSessionClear)(SStreamState* pState);
|
||||
int32_t (*streamStateSessionGetKVByCur)(void* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||
int32_t (*streamStateSessionGetKVByCur)(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||
int32_t (*streamStateStateAddIfNotExist)(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
|
||||
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
|
||||
int32_t (*streamStateSessionGetKeyByRange)(void* pState, const SSessionKey* range, SSessionKey* curKey);
|
||||
int32_t (*streamStateSessionGetKeyByRange)(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey);
|
||||
|
||||
void* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark);
|
||||
TSKEY (*updateInfoFillBlockData)(void *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol);
|
||||
bool (*updateInfoIsUpdated)(void *pInfo, uint64_t tableId, TSKEY ts);
|
||||
bool (*updateInfoIsTableInserted)(void *pInfo, int64_t tbUid);
|
||||
void (*updateInfoDestroy)(void *pInfo);
|
||||
SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark);
|
||||
TSKEY (*updateInfoFillBlockData)(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol);
|
||||
bool (*updateInfoIsUpdated)(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
|
||||
bool (*updateInfoIsTableInserted)(SUpdateInfo *pInfo, int64_t tbUid);
|
||||
void (*updateInfoDestroy)(SUpdateInfo *pInfo);
|
||||
|
||||
SUpdateInfo* (*updateInfoInitP)(SInterval *pInterval, int64_t watermark);
|
||||
void (*updateInfoAddCloseWindowSBF)(void *pInfo);
|
||||
void (*updateInfoDestoryColseWinSBF)(void *pInfo);
|
||||
int32_t (*updateInfoSerialize)(void *buf, int32_t bufLen, const void *pInfo);
|
||||
int32_t (*updateInfoDeserialize)(void *buf, int32_t bufLen, void *pInfo);
|
||||
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo *pInfo);
|
||||
void (*updateInfoDestoryColseWinSBF)(SUpdateInfo *pInfo);
|
||||
int32_t (*updateInfoSerialize)(void *buf, int32_t bufLen, const SUpdateInfo *pInfo);
|
||||
int32_t (*updateInfoDeserialize)(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
|
||||
|
||||
void* (*streamStateSessionSeekKeyNext)(SStreamState* pState, const SSessionKey* key);
|
||||
void* (*streamStateSessionSeekKeyCurrentPrev)(SStreamState* pState, const SSessionKey* key);
|
||||
void* (*streamStateSessionSeekKeyCurrentNext)(SStreamState* pState, const SSessionKey* key);
|
||||
SStreamStateCur* (*streamStateSessionSeekKeyNext)(SStreamState* pState, const SSessionKey* key);
|
||||
SStreamStateCur* (*streamStateSessionSeekKeyCurrentPrev)(SStreamState* pState, const SSessionKey* key);
|
||||
SStreamStateCur* (*streamStateSessionSeekKeyCurrentNext)(SStreamState* pState, const SSessionKey* key);
|
||||
|
||||
void* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp,
|
||||
void* pFile, TSKEY delMark);
|
||||
struct SStreamFileState* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize,
|
||||
uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark);
|
||||
|
||||
void (*streamFileStateDestroy)(void* pFileState);
|
||||
void (*streamFileStateClear)(void* pFileState);
|
||||
bool (*needClearDiskBuff)(void* pFileState);
|
||||
void (*streamFileStateDestroy)(struct SStreamFileState* pFileState);
|
||||
void (*streamFileStateClear)(struct SStreamFileState* pFileState);
|
||||
bool (*needClearDiskBuff)(struct SStreamFileState* pFileState);
|
||||
|
||||
SStreamState* (*streamStateOpen)(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
|
||||
void (*streamStateClose)(SStreamState* pState, bool remove);
|
||||
|
|
|
@ -59,7 +59,7 @@ extern "C" {
|
|||
// TXN* txn;
|
||||
//} STdbState;
|
||||
|
||||
SStreamState* streamStateOpen(char* path, struct SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
|
||||
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
|
||||
void streamStateClose(SStreamState* pState, bool remove);
|
||||
int32_t streamStateBegin(SStreamState* pState);
|
||||
int32_t streamStateCommit(SStreamState* pState);
|
||||
|
|
|
@ -67,17 +67,17 @@ int32_t vnodeStart(SVnode *pVnode);
|
|||
void vnodeStop(SVnode *pVnode);
|
||||
int64_t vnodeGetSyncHandle(SVnode *pVnode);
|
||||
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot);
|
||||
void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId);
|
||||
void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t* numOfTables, int64_t* numOfNormalTables);
|
||||
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
|
||||
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
|
||||
int32_t vnodeIsCatchUp(SVnode *pVnode);
|
||||
ESyncRole vnodeGetRole(SVnode *pVnode);
|
||||
|
||||
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
|
||||
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list);
|
||||
int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
|
||||
int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray *list);
|
||||
void *vnodeGetIdx(SVnode *pVnode);
|
||||
void *vnodeGetIvtIdx(SVnode *pVnode);
|
||||
void *vnodeGetIdx(void *pVnode);
|
||||
void *vnodeGetIvtIdx(void *pVnode);
|
||||
|
||||
int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num);
|
||||
int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num);
|
||||
|
@ -105,22 +105,22 @@ typedef struct SMetaEntry SMetaEntry;
|
|||
|
||||
#define META_READER_NOLOCK 0x1
|
||||
|
||||
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
|
||||
void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags);
|
||||
void metaReaderReleaseLock(SMetaReader *pReader);
|
||||
void metaReaderClear(SMetaReader *pReader);
|
||||
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||
int32_t metaGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
|
||||
int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||
int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
|
||||
int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
|
||||
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList);
|
||||
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList);
|
||||
int32_t metaReadNext(SMetaReader *pReader);
|
||||
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
|
||||
const void *metaGetTableTagVal(const void *tag, int16_t type, STagVal *tagVal);
|
||||
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
|
||||
|
||||
int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName);
|
||||
int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid);
|
||||
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
|
||||
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
|
||||
bool metaIsTableExist(void* pVnode, tb_uid_t uid);
|
||||
int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
|
||||
bool *acquired);
|
||||
int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
|
||||
|
@ -138,7 +138,7 @@ int64_t metaGetNtbNum(SMeta *pMeta);
|
|||
// int64_t uid;
|
||||
// int64_t ctbNum;
|
||||
//} SMetaStbStats;
|
||||
int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
|
||||
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables);
|
||||
|
||||
//typedef struct SMetaFltParam {
|
||||
// tb_uid_t suid;
|
||||
|
@ -163,7 +163,7 @@ typedef SVCreateTSmaReq SSmaCfg;
|
|||
|
||||
typedef struct SMTbCursor SMTbCursor;
|
||||
|
||||
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
||||
SMTbCursor *metaOpenTbCursor(void *pVnode);
|
||||
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
||||
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
|
||||
int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
|
||||
|
@ -260,16 +260,20 @@ typedef struct STqReader {
|
|||
} STqReader;
|
||||
|
||||
STqReader *tqReaderOpen(SVnode *pVnode);
|
||||
void tqCloseReader(STqReader *);
|
||||
void tqReaderClose(STqReader *);
|
||||
|
||||
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
|
||||
int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
|
||||
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
|
||||
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
|
||||
|
||||
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
|
||||
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid);
|
||||
bool tqCurrentBlockConsumed(const STqReader* pReader);
|
||||
|
||||
int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
|
||||
bool tqNextBlockInWal(STqReader *pReader, const char *idstr);
|
||||
bool tqNextBlockImpl(STqReader *pReader, const char *idstr);
|
||||
SWalReader* tqGetWalReader(STqReader* pReader);
|
||||
|
||||
int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, const char *id);
|
||||
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
||||
|
|
|
@ -161,6 +161,8 @@ void* metaGetIdx(SMeta* pMeta);
|
|||
void* metaGetIvtIdx(SMeta* pMeta);
|
||||
int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
|
||||
|
||||
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
|
||||
|
||||
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
|
||||
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
|
||||
|
||||
|
@ -469,6 +471,8 @@ struct SCompactInfo {
|
|||
STimeWindow tw;
|
||||
};
|
||||
|
||||
void initStorageAPI(SStorageAPI* pAPI);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -17,6 +17,11 @@
|
|||
#include "osMemory.h"
|
||||
#include "tencode.h"
|
||||
|
||||
void _metaReaderInit(SMetaReader* pReader, void* pVnode, int32_t flags) {
|
||||
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
|
||||
metaReaderInit(pReader, pMeta, flags);
|
||||
}
|
||||
|
||||
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
|
||||
memset(pReader, 0, sizeof(*pReader));
|
||||
pReader->flags = flags;
|
||||
|
@ -64,96 +69,20 @@ _err:
|
|||
return -1;
|
||||
}
|
||||
|
||||
// int metaGetTableEntryByUidTest(void* meta, SArray *uidList) {
|
||||
//
|
||||
// SArray* readerList = taosArrayInit(taosArrayGetSize(uidList), sizeof(SMetaReader));
|
||||
// SArray* uidVersion = taosArrayInit(taosArrayGetSize(uidList), sizeof(STbDbKey));
|
||||
// SMeta *pMeta = meta;
|
||||
// int64_t version;
|
||||
// SHashObj *uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
//
|
||||
// int64_t stt1 = taosGetTimestampUs();
|
||||
// for(int i = 0; i < taosArrayGetSize(uidList); i++) {
|
||||
// void* ppVal = NULL;
|
||||
// int vlen = 0;
|
||||
// uint64_t * uid = taosArrayGet(uidList, i);
|
||||
// // query uid.idx
|
||||
// if (tdbTbGet(pMeta->pUidIdx, uid, sizeof(*uid), &ppVal, &vlen) < 0) {
|
||||
// continue;
|
||||
// }
|
||||
// version = *(int64_t *)ppVal;
|
||||
//
|
||||
// STbDbKey tbDbKey = {.version = version, .uid = *uid};
|
||||
// taosArrayPush(uidVersion, &tbDbKey);
|
||||
// taosHashPut(uHash, uid, sizeof(int64_t), ppVal, sizeof(int64_t));
|
||||
// }
|
||||
// int64_t stt2 = taosGetTimestampUs();
|
||||
// qDebug("metaGetTableEntryByUidTest1 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt2-stt1);
|
||||
//
|
||||
// TBC *pCur = NULL;
|
||||
// tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
|
||||
// tdbTbcMoveToFirst(pCur);
|
||||
// void *pKey = NULL;
|
||||
// int kLen = 0;
|
||||
//
|
||||
// while(1){
|
||||
// SMetaReader pReader = {0};
|
||||
// int32_t ret = tdbTbcNext(pCur, &pKey, &kLen, &pReader.pBuf, &pReader.szBuf);
|
||||
// if (ret < 0) break;
|
||||
// STbDbKey *tmp = (STbDbKey*)pKey;
|
||||
// int64_t *ver = (int64_t*)taosHashGet(uHash, &tmp->uid, sizeof(int64_t));
|
||||
// if(ver == NULL || *ver != tmp->version) continue;
|
||||
// taosArrayPush(readerList, &pReader);
|
||||
// }
|
||||
// tdbTbcClose(pCur);
|
||||
//
|
||||
// taosArrayClear(readerList);
|
||||
// int64_t stt3 = taosGetTimestampUs();
|
||||
// qDebug("metaGetTableEntryByUidTest2 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt3-stt2);
|
||||
// for(int i = 0; i < taosArrayGetSize(uidVersion); i++) {
|
||||
// SMetaReader pReader = {0};
|
||||
//
|
||||
// STbDbKey *tbDbKey = taosArrayGet(uidVersion, i);
|
||||
// // query table.db
|
||||
// if (tdbTbGet(pMeta->pTbDb, tbDbKey, sizeof(STbDbKey), &pReader.pBuf, &pReader.szBuf) < 0) {
|
||||
// continue;
|
||||
// }
|
||||
// taosArrayPush(readerList, &pReader);
|
||||
// }
|
||||
// int64_t stt4 = taosGetTimestampUs();
|
||||
// qDebug("metaGetTableEntryByUidTest3 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt4-stt3);
|
||||
//
|
||||
// for(int i = 0; i < taosArrayGetSize(readerList); i++){
|
||||
// SMetaReader* pReader = taosArrayGet(readerList, i);
|
||||
// metaReaderInit(pReader, meta, 0);
|
||||
// // decode the entry
|
||||
// tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
|
||||
//
|
||||
// if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
|
||||
// }
|
||||
// metaReaderClear(pReader);
|
||||
// }
|
||||
// int64_t stt5 = taosGetTimestampUs();
|
||||
// qDebug("metaGetTableEntryByUidTest4 rows:%d, cost:%ld us", taosArrayGetSize(readerList), stt5-stt4);
|
||||
// return 0;
|
||||
// }
|
||||
|
||||
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid) {
|
||||
// query uid.idx
|
||||
metaRLock(pMeta);
|
||||
|
||||
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), NULL, NULL) < 0) {
|
||||
metaULock(pMeta);
|
||||
bool metaIsTableExist(void *pVnode, tb_uid_t uid) {
|
||||
SVnode* pVnodeObj = pVnode;
|
||||
metaRLock(pVnodeObj->pMeta); // query uid.idx
|
||||
|
||||
if (tdbTbGet(pVnodeObj->pMeta->pUidIdx, &uid, sizeof(uid), NULL, NULL) < 0) {
|
||||
metaULock(pVnodeObj->pMeta);
|
||||
return false;
|
||||
}
|
||||
|
||||
metaULock(pMeta);
|
||||
|
||||
metaULock(pVnodeObj->pMeta);
|
||||
return true;
|
||||
}
|
||||
|
||||
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
||||
int metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
||||
SMeta *pMeta = pReader->pMeta;
|
||||
int64_t version1;
|
||||
|
||||
|
@ -167,7 +96,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
|||
return metaGetTableEntryByVersion(pReader, version1, uid);
|
||||
}
|
||||
|
||||
int metaGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid) {
|
||||
int metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid) {
|
||||
SMeta *pMeta = pReader->pMeta;
|
||||
|
||||
SMetaInfo info;
|
||||
|
@ -190,7 +119,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
|
|||
}
|
||||
|
||||
uid = *(tb_uid_t *)pReader->pBuf;
|
||||
return metaGetTableEntryByUid(pReader, uid);
|
||||
return metaReaderGetTableEntryByUid(pReader, uid);
|
||||
}
|
||||
|
||||
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
|
||||
|
@ -214,7 +143,7 @@ int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName) {
|
|||
int code = 0;
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, (SMeta *)meta, 0);
|
||||
code = metaGetTableEntryByUid(&mr, uid);
|
||||
code = metaReaderGetTableEntryByUid(&mr, uid);
|
||||
if (code < 0) {
|
||||
metaReaderClear(&mr);
|
||||
return -1;
|
||||
|
@ -230,7 +159,7 @@ int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) {
|
|||
int code = 0;
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, (SMeta *)meta, 0);
|
||||
code = metaGetTableEntryByUid(&mr, uid);
|
||||
code = metaReaderGetTableEntryByUid(&mr, uid);
|
||||
if (code < 0) {
|
||||
metaReaderClear(&mr);
|
||||
return -1;
|
||||
|
@ -283,7 +212,7 @@ int metaReadNext(SMetaReader *pReader) {
|
|||
}
|
||||
|
||||
#if 1 // ===================================================
|
||||
SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
|
||||
SMTbCursor *metaOpenTbCursor(void *pVnode) {
|
||||
SMTbCursor *pTbCur = NULL;
|
||||
|
||||
pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur));
|
||||
|
@ -291,12 +220,12 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
metaReaderInit(&pTbCur->mr, pMeta, 0);
|
||||
SVnode* pVnodeObj = pVnode;
|
||||
metaReaderInit(&pTbCur->mr, pVnodeObj->pMeta, 0);
|
||||
|
||||
tdbTbcOpen(pMeta->pUidIdx, (TBC **)&pTbCur->pDbc, NULL);
|
||||
tdbTbcOpen(pVnodeObj->pMeta->pUidIdx, (TBC **)&pTbCur->pDbc, NULL);
|
||||
|
||||
tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
|
||||
|
||||
return pTbCur;
|
||||
}
|
||||
|
||||
|
@ -876,7 +805,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
|
|||
STSma *pTSma = NULL;
|
||||
for (int i = 0; i < pSW->number; ++i) {
|
||||
smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
|
||||
if (metaGetTableEntryByUid(&mr, smaId) < 0) {
|
||||
if (metaReaderGetTableEntryByUid(&mr, smaId) < 0) {
|
||||
tDecoderClear(&mr.coder);
|
||||
metaWarn("vgId:%d, no entry for tbId:%" PRIi64 ", smaId:%" PRIi64, TD_VID(pMeta->pVnode), uid, smaId);
|
||||
continue;
|
||||
|
@ -926,7 +855,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
|
|||
STSma *pTSma = NULL;
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pMeta, 0);
|
||||
if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
|
||||
if (metaReaderGetTableEntryByUid(&mr, indexUid) < 0) {
|
||||
metaWarn("vgId:%d, failed to get table entry for smaId:%" PRIi64, TD_VID(pMeta->pVnode), indexUid);
|
||||
metaReaderClear(&mr);
|
||||
return NULL;
|
||||
|
@ -1027,7 +956,7 @@ SArray *metaGetSmaTbUids(SMeta *pMeta) {
|
|||
|
||||
#endif
|
||||
|
||||
const void *metaGetTableTagVal(void *pTag, int16_t type, STagVal *val) {
|
||||
const void *metaGetTableTagVal(const void *pTag, int16_t type, STagVal *val) {
|
||||
STag *tag = (STag *)pTag;
|
||||
if (type == TSDB_DATA_TYPE_JSON) {
|
||||
return tag;
|
||||
|
@ -1565,30 +1494,35 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo) {
|
||||
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t* numOfTables) {
|
||||
int32_t code = 0;
|
||||
*numOfTables = 0;
|
||||
|
||||
metaRLock(pMeta);
|
||||
SVnode* pVnodeObj = pVnode;
|
||||
metaRLock(pVnodeObj->pMeta);
|
||||
|
||||
// fast path: search cache
|
||||
if (metaStatsCacheGet(pMeta, uid, pInfo) == TSDB_CODE_SUCCESS) {
|
||||
metaULock(pMeta);
|
||||
SMetaStbStats state = {0};
|
||||
if (metaStatsCacheGet(pVnodeObj->pMeta, uid, &state) == TSDB_CODE_SUCCESS) {
|
||||
metaULock(pVnodeObj->pMeta);
|
||||
*numOfTables = state.ctbNum;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// slow path: search TDB
|
||||
int64_t ctbNum = 0;
|
||||
vnodeGetCtbNum(pMeta->pVnode, uid, &ctbNum);
|
||||
vnodeGetCtbNum(pVnode, uid, &ctbNum);
|
||||
|
||||
metaULock(pMeta);
|
||||
metaULock(pVnodeObj->pMeta);
|
||||
*numOfTables = ctbNum;
|
||||
|
||||
pInfo->uid = uid;
|
||||
pInfo->ctbNum = ctbNum;
|
||||
state.uid = uid;
|
||||
state.ctbNum = ctbNum;
|
||||
|
||||
// upsert the cache
|
||||
metaWLock(pMeta);
|
||||
metaStatsCacheUpsert(pMeta, pInfo);
|
||||
metaULock(pMeta);
|
||||
metaWLock(pVnodeObj->pMeta);
|
||||
metaStatsCacheUpsert(pVnodeObj->pMeta, &state);
|
||||
metaULock(pVnodeObj->pMeta);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
|
|
@ -36,7 +36,7 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
|
|||
// validate req
|
||||
// save smaIndex
|
||||
metaReaderInit(&mr, pMeta, 0);
|
||||
if (metaGetTableEntryByUidCache(&mr, pCfg->indexUid) == 0) {
|
||||
if (metaReaderGetTableEntryByUidCache(&mr, pCfg->indexUid) == 0) {
|
||||
#if 1
|
||||
terrno = TSDB_CODE_TSMA_ALREADY_EXIST;
|
||||
metaReaderClear(&mr);
|
||||
|
|
|
@ -690,7 +690,7 @@ _err:
|
|||
return -1;
|
||||
}
|
||||
|
||||
int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMetaRsp **pMetaRsp) {
|
||||
int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRsp **pMetaRsp) {
|
||||
SMetaEntry me = {0};
|
||||
SMetaReader mr = {0};
|
||||
|
||||
|
@ -729,7 +729,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMe
|
|||
metaReaderClear(&mr);
|
||||
|
||||
// build SMetaEntry
|
||||
me.version = version;
|
||||
me.version = ver;
|
||||
me.type = pReq->type;
|
||||
me.uid = pReq->uid;
|
||||
me.name = pReq->name;
|
||||
|
|
|
@ -894,7 +894,7 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
|
|||
|
||||
metaReaderInit(&mr, SMA_META(pSma), 0);
|
||||
smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
|
||||
if (metaGetTableEntryByUidCache(&mr, pInfo->suid) < 0) {
|
||||
if (metaReaderGetTableEntryByUidCache(&mr, pInfo->suid) < 0) {
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
@ -1121,7 +1121,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
|
|||
for (int64_t i = 0; i < arrSize; ++i) {
|
||||
suid = *(tb_uid_t *)taosArrayGet(suidList, i);
|
||||
smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid);
|
||||
if (metaGetTableEntryByUidCache(&mr, suid) < 0) {
|
||||
if (metaReaderGetTableEntryByUidCache(&mr, suid) < 0) {
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
|
|
@ -69,12 +69,12 @@ static void destroyTqHandle(void* data) {
|
|||
if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
|
||||
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||
tqCloseReader(pData->execHandle.pTqReader);
|
||||
tqReaderClose(pData->execHandle.pTqReader);
|
||||
walCloseReader(pData->pWalReader);
|
||||
taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
|
||||
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
walCloseReader(pData->pWalReader);
|
||||
tqCloseReader(pData->execHandle.pTqReader);
|
||||
tqReaderClose(pData->execHandle.pTqReader);
|
||||
}
|
||||
if(pData->msg != NULL) {
|
||||
rpcFreeCont(pData->msg->pCont);
|
||||
|
@ -787,6 +787,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
}
|
||||
|
||||
SReadHandle handle = {.vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
|
||||
initStorageAPI(&handle.api);
|
||||
|
||||
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
|
||||
if (pTask->exec.pExecutor == NULL) {
|
||||
|
|
|
@ -273,7 +273,7 @@ STqReader* tqReaderOpen(SVnode* pVnode) {
|
|||
return pReader;
|
||||
}
|
||||
|
||||
void tqCloseReader(STqReader* pReader) {
|
||||
void tqReaderClose(STqReader* pReader) {
|
||||
// close wal reader
|
||||
if (pReader->pWalReader) {
|
||||
walCloseReader(pReader->pWalReader);
|
||||
|
@ -294,7 +294,7 @@ void tqCloseReader(STqReader* pReader) {
|
|||
taosMemoryFree(pReader);
|
||||
}
|
||||
|
||||
int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) {
|
||||
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
|
||||
if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -605,6 +605,8 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
|
|||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
|
||||
|
||||
SSDataBlock* pBlock = pReader->pResBlock;
|
||||
*pRes = pBlock;
|
||||
|
||||
blockDataCleanup(pBlock);
|
||||
|
||||
int32_t sversion = pSubmitTbData->sver;
|
||||
|
@ -1084,7 +1086,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
|||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); ++i) {
|
||||
uint64_t* id = (uint64_t*)taosArrayGet(tbUidList, i);
|
||||
|
||||
int32_t code = metaGetTableEntryByUidCache(&mr, *id);
|
||||
int32_t code = metaReaderGetTableEntryByUidCache(&mr, *id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno));
|
||||
continue;
|
||||
|
|
|
@ -51,7 +51,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in
|
|||
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
|
||||
|
||||
// TODO add reference to gurantee success
|
||||
if (metaGetTableEntryByUidCache(&mr, uid) < 0) {
|
||||
if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
|
||||
metaReaderClear(&mr);
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -980,7 +980,7 @@ static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
|
|||
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
|
||||
if (metaGetTableEntryByUidCache(&mr, uid) < 0) {
|
||||
if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
|
||||
metaReaderClear(&mr); // table not esist
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -5376,7 +5376,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
|
|||
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pVnode->pMeta, 0);
|
||||
int32_t code = metaGetTableEntryByUidCache(&mr, uid);
|
||||
int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||
metaReaderClear(&mr);
|
||||
|
@ -5389,7 +5389,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
|
|||
if (mr.me.type == TSDB_CHILD_TABLE) {
|
||||
tDecoderClear(&mr.coder);
|
||||
*suid = mr.me.ctbEntry.suid;
|
||||
code = metaGetTableEntryByUidCache(&mr, *suid);
|
||||
code = metaReaderGetTableEntryByUidCache(&mr, *suid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||
metaReaderClear(&mr);
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "vnd.h"
|
||||
#include "tstreamUpdate.h"
|
||||
|
||||
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
||||
SVnodeInfo info = {0};
|
||||
|
@ -444,3 +445,177 @@ void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) {
|
|||
pSnapshot->lastApplyTerm = pVnode->state.commitTerm;
|
||||
pSnapshot->lastConfigIndex = -1;
|
||||
}
|
||||
|
||||
static void initTsdbReaderAPI(TsdReader* pReader);
|
||||
static void initMetadataAPI(SStoreMeta* pMeta);
|
||||
static void initTqAPI(SStoreTqReader* pTq);
|
||||
static void initStateStoreAPI(SStateStore* pStore);
|
||||
static void initMetaReaderAPI(SStoreMetaReader* pMetaReader);
|
||||
|
||||
void initStorageAPI(SStorageAPI* pAPI) {
|
||||
initTsdbReaderAPI(&pAPI->tsdReader);
|
||||
initMetadataAPI(&pAPI->metaFn);
|
||||
initTqAPI(&pAPI->tqReaderFn);
|
||||
initStateStoreAPI(&pAPI->stateStore);
|
||||
initMetaReaderAPI(&pAPI->metaReaderFn);
|
||||
}
|
||||
|
||||
void initTsdbReaderAPI(TsdReader* pReader) {
|
||||
pReader->tsdReaderOpen = (__store_reader_open_fn_t)tsdbReaderOpen;
|
||||
pReader->tsdReaderClose = tsdbReaderClose;
|
||||
|
||||
pReader->tsdNextDataBlock = tsdbNextDataBlock;
|
||||
|
||||
pReader->tsdReaderRetrieveDataBlock = tsdbRetrieveDataBlock;
|
||||
pReader->tsdReaderReleaseDataBlock = tsdbReleaseDataBlock;
|
||||
|
||||
pReader->tsdReaderRetrieveBlockSMAInfo = tsdbRetrieveDatablockSMA;
|
||||
|
||||
pReader->tsdReaderNotifyClosing = tsdbReaderSetCloseFlag;
|
||||
pReader->tsdReaderResetStatus = tsdbReaderReset;
|
||||
|
||||
pReader->tsdReaderGetDataBlockDistInfo = tsdbGetFileBlocksDistInfo;
|
||||
pReader->tsdReaderGetNumOfInMemRows = tsdbGetNumOfRowsInMemTable; // todo this function should be moved away
|
||||
|
||||
pReader->tsdSetQueryTableList = tsdbSetTableList;
|
||||
pReader->tsdSetReaderTaskId = (void (*)(void *, const char *))tsdbReaderSetId;
|
||||
}
|
||||
|
||||
void initMetadataAPI(SStoreMeta* pMeta) {
|
||||
pMeta->isTableExisted = metaIsTableExist;
|
||||
|
||||
pMeta->openTableMetaCursor = metaOpenTbCursor;
|
||||
pMeta->closeTableMetaCursor = metaCloseTbCursor;
|
||||
pMeta->cursorNext = metaTbCursorNext;
|
||||
pMeta->cursorPrev = metaTbCursorPrev;
|
||||
|
||||
pMeta->getBasicInfo = vnodeGetInfo;
|
||||
pMeta->getNumOfChildTables = metaGetStbStats;
|
||||
|
||||
pMeta->getChildTableList = vnodeGetCtbIdList;
|
||||
|
||||
pMeta->storeGetIndexInfo = vnodeGetIdx;
|
||||
pMeta->getInvertIndex = vnodeGetIvtIdx;
|
||||
|
||||
pMeta->extractTagVal = (const void *(*)(const void *, int16_t, STagVal *))metaGetTableTagVal;
|
||||
|
||||
}
|
||||
|
||||
void initTqAPI(SStoreTqReader* pTq) {
|
||||
pTq->tqReaderOpen = tqReaderOpen;
|
||||
pTq->tqReaderSetColIdList = tqReaderSetColIdList;
|
||||
|
||||
pTq->tqReaderClose = tqReaderClose;
|
||||
pTq->tqReaderSeek = tqReaderSeek;
|
||||
pTq->tqRetrieveBlock = tqRetrieveDataBlock;
|
||||
|
||||
pTq->tqReaderNextBlockInWal = tqNextBlockInWal;
|
||||
|
||||
pTq->tqNextBlockImpl = tqNextBlockImpl;// todo remove it
|
||||
|
||||
pTq->tqReaderAddTables = tqReaderAddTbUidList;
|
||||
pTq->tqReaderSetQueryTableList = tqReaderSetTbUidList;
|
||||
|
||||
pTq->tqReaderRemoveTables = tqReaderRemoveTbUidList;
|
||||
|
||||
pTq->tqReaderIsQueriedTable = tqReaderIsQueriedTable;
|
||||
pTq->tqReaderCurrentBlockConsumed = tqCurrentBlockConsumed;
|
||||
|
||||
pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it
|
||||
pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it
|
||||
|
||||
pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it
|
||||
pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
|
||||
}
|
||||
|
||||
void initStateStoreAPI(SStateStore* pStore) {
|
||||
pStore->streamFileStateInit = streamFileStateInit;
|
||||
pStore->updateInfoDestoryColseWinSBF = updateInfoDestoryColseWinSBF;
|
||||
|
||||
pStore->streamStateGetByPos = streamStateGetByPos;
|
||||
|
||||
pStore->streamStatePutParName = streamStatePutParName;
|
||||
pStore->streamStateGetParName = streamStateGetParName;
|
||||
|
||||
pStore->streamStateAddIfNotExist = streamStateAddIfNotExist;
|
||||
pStore->streamStateReleaseBuf = streamStateReleaseBuf;
|
||||
pStore->streamStateFreeVal = streamStateFreeVal;
|
||||
|
||||
pStore->streamStatePut = streamStatePut;
|
||||
pStore->streamStateGet = streamStateGet;
|
||||
pStore->streamStateCheck = streamStateCheck;
|
||||
pStore->streamStateGetByPos = streamStateGetByPos;
|
||||
pStore->streamStateDel = streamStateDel;
|
||||
pStore->streamStateClear = streamStateClear;
|
||||
pStore->streamStateSaveInfo = streamStateSaveInfo;
|
||||
pStore->streamStateGetInfo = streamStateGetInfo;
|
||||
pStore->streamStateSetNumber = streamStateSetNumber;
|
||||
|
||||
pStore->streamStateFillPut = streamStateFillPut;
|
||||
pStore->streamStateFillGet = streamStateFillGet;
|
||||
pStore->streamStateFillDel = streamStateFillDel;
|
||||
|
||||
pStore->streamStateCurNext = streamStateCurNext;
|
||||
pStore->streamStateCurPrev = streamStateCurPrev;
|
||||
|
||||
pStore->streamStateGetAndCheckCur = streamStateGetAndCheckCur;
|
||||
pStore->streamStateSeekKeyNext = streamStateSeekKeyNext;
|
||||
pStore->streamStateFillSeekKeyNext = streamStateFillSeekKeyNext;
|
||||
pStore->streamStateFillSeekKeyPrev = streamStateFillSeekKeyPrev;
|
||||
pStore->streamStateFreeCur = streamStateFreeCur;
|
||||
|
||||
pStore->streamStateGetGroupKVByCur = streamStateGetGroupKVByCur;
|
||||
pStore->streamStateGetKVByCur = streamStateGetKVByCur;
|
||||
|
||||
pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist;
|
||||
pStore->streamStateSessionPut = streamStateSessionPut;
|
||||
pStore->streamStateSessionGet = streamStateSessionGet;
|
||||
pStore->streamStateSessionDel = streamStateSessionDel;
|
||||
pStore->streamStateSessionClear = streamStateSessionClear;
|
||||
pStore->streamStateSessionGetKVByCur = streamStateSessionGetKVByCur;
|
||||
pStore->streamStateStateAddIfNotExist = streamStateStateAddIfNotExist;
|
||||
pStore->streamStateSessionGetKeyByRange = streamStateSessionGetKeyByRange;
|
||||
|
||||
pStore->updateInfoInit = updateInfoInit;
|
||||
pStore->updateInfoFillBlockData = updateInfoFillBlockData;
|
||||
pStore->updateInfoIsUpdated = updateInfoIsUpdated;
|
||||
pStore->updateInfoIsTableInserted = updateInfoIsTableInserted;
|
||||
pStore->updateInfoDestroy = updateInfoDestroy;
|
||||
|
||||
pStore->updateInfoInitP = updateInfoInitP;
|
||||
pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF;
|
||||
pStore->updateInfoDestoryColseWinSBF = updateInfoDestoryColseWinSBF;
|
||||
pStore->updateInfoSerialize = updateInfoSerialize;
|
||||
pStore->updateInfoDeserialize = updateInfoDeserialize;
|
||||
|
||||
pStore->streamStateSessionSeekKeyNext = streamStateSessionSeekKeyNext;
|
||||
pStore->streamStateSessionSeekKeyCurrentPrev = streamStateSessionSeekKeyCurrentPrev;
|
||||
pStore->streamStateSessionSeekKeyCurrentNext = streamStateSessionSeekKeyCurrentNext;
|
||||
|
||||
pStore->streamFileStateInit = streamFileStateInit;
|
||||
|
||||
pStore->streamFileStateDestroy = streamFileStateDestroy;
|
||||
pStore->streamFileStateClear = streamFileStateClear;
|
||||
pStore->needClearDiskBuff = needClearDiskBuff;
|
||||
|
||||
pStore->streamStateOpen = streamStateOpen;
|
||||
pStore->streamStateClose = streamStateClose;
|
||||
pStore->streamStateBegin = streamStateBegin;
|
||||
pStore->streamStateCommit = streamStateCommit;
|
||||
pStore->streamStateDestroy= streamStateDestroy;
|
||||
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
|
||||
}
|
||||
|
||||
void initMetaReaderAPI(SStoreMetaReader* pMetaReader) {
|
||||
pMetaReader->initReader = _metaReaderInit;
|
||||
pMetaReader->clearReader = metaReaderClear;
|
||||
|
||||
pMetaReader->getTableEntryByUid = metaReaderGetTableEntryByUid;
|
||||
pMetaReader->clearReader = metaReaderClear;
|
||||
|
||||
pMetaReader->getEntryGetUidCache = metaReaderGetTableEntryByUidCache;
|
||||
pMetaReader->getTableEntryByName = metaGetTableEntryByName;
|
||||
|
||||
pMetaReader->readerReleaseLock = metaReaderReleaseLock;
|
||||
}
|
||||
|
||||
|
|
|
@ -80,7 +80,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
|
|||
metaRsp.suid = mer1.me.uid;
|
||||
} else if (mer1.me.type == TSDB_CHILD_TABLE) {
|
||||
metaReaderInit(&mer2, pVnode->pMeta, META_READER_NOLOCK);
|
||||
if (metaGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
|
||||
if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
|
||||
|
||||
strcpy(metaRsp.stbName, mer2.me.name);
|
||||
metaRsp.suid = mer2.me.uid;
|
||||
|
@ -189,7 +189,7 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
|
|||
goto _exit;
|
||||
} else if (mer1.me.type == TSDB_CHILD_TABLE) {
|
||||
metaReaderInit(&mer2, pVnode->pMeta, 0);
|
||||
if (metaGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
|
||||
if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
|
||||
|
||||
strcpy(cfgRsp.stbName, mer2.me.name);
|
||||
schema = mer2.me.stbEntry.schemaRow;
|
||||
|
@ -410,13 +410,24 @@ void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
|||
"nBatchInsertSuccess");
|
||||
}
|
||||
|
||||
void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {
|
||||
void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t* numOfTables, int64_t* numOfNormalTables) {
|
||||
SVnode* pVnodeObj = pVnode;
|
||||
SVnodeCfg* pConf = &pVnodeObj->config;
|
||||
|
||||
if (dbname) {
|
||||
*dbname = pVnode->config.dbname;
|
||||
*dbname = pConf->dbname;
|
||||
}
|
||||
|
||||
if (vgId) {
|
||||
*vgId = TD_VID(pVnode);
|
||||
*vgId = TD_VID(pVnodeObj);
|
||||
}
|
||||
|
||||
if (numOfTables) {
|
||||
*numOfTables = pConf->vndStats.numOfNTables + pConf->vndStats.numOfCTables;
|
||||
}
|
||||
|
||||
if (numOfNormalTables) {
|
||||
*numOfNormalTables = pConf->vndStats.numOfNTables;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -440,8 +451,10 @@ int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
|
|||
int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg) {
|
||||
return 0;
|
||||
}
|
||||
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list) {
|
||||
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid, 1);
|
||||
|
||||
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list) {
|
||||
SVnode *pVnodeObj = pVnode;
|
||||
SMCtbCursor *pCur = metaOpenCtbCursor(pVnodeObj->pMeta, suid, 1);
|
||||
|
||||
while (1) {
|
||||
tb_uid_t id = metaCtbCursorNext(pCur);
|
||||
|
@ -529,10 +542,8 @@ int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num) {
|
|||
for (int64_t i = 0; i < arrSize; ++i) {
|
||||
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
|
||||
|
||||
SMetaStbStats stats = {0};
|
||||
metaGetStbStats(pVnode->pMeta, suid, &stats);
|
||||
int64_t ctbNum = stats.ctbNum;
|
||||
// vnodeGetCtbNum(pVnode, id, &ctbNum);
|
||||
int64_t ctbNum = 0;
|
||||
metaGetStbStats(pVnode, suid, &ctbNum);
|
||||
|
||||
int numOfCols = 0;
|
||||
vnodeGetStbColumnNum(pVnode, suid, &numOfCols);
|
||||
|
@ -567,16 +578,17 @@ int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void *vnodeGetIdx(SVnode *pVnode) {
|
||||
void *vnodeGetIdx(void *pVnode) {
|
||||
if (pVnode == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
return metaGetIdx(pVnode->pMeta);
|
||||
|
||||
return metaGetIdx(((SVnode*)pVnode)->pMeta);
|
||||
}
|
||||
|
||||
void *vnodeGetIvtIdx(SVnode *pVnode) {
|
||||
void *vnodeGetIvtIdx(void *pVnode) {
|
||||
if (pVnode == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
return metaGetIvtIdx(pVnode->pMeta);
|
||||
return metaGetIvtIdx(((SVnode*)pVnode)->pMeta);
|
||||
}
|
||||
|
|
|
@ -515,6 +515,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||
initStorageAPI(&handle.api);
|
||||
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_SCH_QUERY:
|
||||
case TDMT_SCH_MERGE_QUERY:
|
||||
|
|
|
@ -95,7 +95,7 @@ struct SExecTaskInfo {
|
|||
};
|
||||
|
||||
void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst);
|
||||
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model);
|
||||
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI);
|
||||
void doDestroyTask(SExecTaskInfo* pTaskInfo);
|
||||
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
|
||||
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
|
||||
|
|
|
@ -307,7 +307,7 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle,
|
|||
SMetaReader mr = {0};
|
||||
|
||||
pAPI->metaReaderFn.initReader(&mr, metaHandle, 0);
|
||||
code = pAPI->metaReaderFn.readerGetEntryGetUidCache(&mr, info->uid);
|
||||
code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, info->uid);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
*pQualified = false;
|
||||
|
@ -1090,12 +1090,11 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
|
|||
}
|
||||
|
||||
if (!pTagCond) { // no tag filter condition exists, let's fetch all tables of this super table
|
||||
// vnodeGetCtbIdList();
|
||||
pStorageAPI->metaFn.storeGetChildTableList(pVnode, pScanNode->suid, pUidList);
|
||||
pStorageAPI->metaFn.getChildTableList(pVnode, pScanNode->suid, pUidList);
|
||||
} else {
|
||||
// failed to find the result in the cache, let try to calculate the results
|
||||
if (pTagIndexCond) {
|
||||
void* pIndex = pStorageAPI->metaFn.storeGetInvertIndex(pVnode);
|
||||
void* pIndex = pStorageAPI->metaFn.getInvertIndex(pVnode);
|
||||
|
||||
SIndexMetaArg metaArg = {
|
||||
.metaEx = pVnode, .idx = pStorageAPI->metaFn.storeGetIndexInfo(pVnode), .ivtIdx = pIndex, .suid = pScanNode->uid};
|
||||
|
@ -1170,7 +1169,7 @@ int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode,
|
|||
SMetaReader mr = {0};
|
||||
|
||||
pAPI->metaReaderFn.initReader(&mr, pVnode, 0);
|
||||
if (pAPI->metaReaderFn.readerGetEntryGetUidCache(&mr, uid) != 0) { // table not exist
|
||||
if (pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid) != 0) { // table not exist
|
||||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||
}
|
||||
|
|
|
@ -252,7 +252,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
|
|||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
|
||||
uint64_t id) {
|
||||
if (msg == NULL) { // create raw scan
|
||||
SExecTaskInfo* pTaskInfo = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE);
|
||||
SExecTaskInfo* pTaskInfo = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api);
|
||||
if (NULL == pTaskInfo) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
|
@ -264,6 +264,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pTaskInfo->storageAPI = pReaderHandle->api;
|
||||
qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo));
|
||||
return pTaskInfo;
|
||||
}
|
||||
|
@ -1092,8 +1093,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
|
||||
ASSERT(0);
|
||||
// walReaderVerifyOffset(pInfo->tqReader->pWalReader, pOffset);
|
||||
// if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) {
|
||||
// qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
|
||||
// if (tqReaderSeek(pInfo->tqReader, pOffset->version + 1, id) < 0) {
|
||||
// qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
|
||||
// return -1;
|
||||
// }
|
||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
|
|
|
@ -35,7 +35,7 @@
|
|||
|
||||
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
|
||||
|
||||
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model) {
|
||||
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI) {
|
||||
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
||||
if (pTaskInfo == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -48,6 +48,7 @@ SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOP
|
|||
pTaskInfo->execModel = model;
|
||||
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
|
||||
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
|
||||
pTaskInfo->storageAPI = *pAPI;
|
||||
|
||||
taosInitRWLatch(&pTaskInfo->lock);
|
||||
|
||||
|
@ -55,7 +56,6 @@ SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOP
|
|||
pTaskInfo->id.queryId = queryId;
|
||||
pTaskInfo->id.str = taosMemoryMalloc(64);
|
||||
buildTaskId(taskId, queryId, pTaskInfo->id.str);
|
||||
|
||||
return pTaskInfo;
|
||||
}
|
||||
|
||||
|
@ -78,7 +78,7 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
|
|||
|
||||
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
||||
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) {
|
||||
*pTaskInfo = doCreateTask(pPlan->id.queryId, taskId, vgId, model);
|
||||
*pTaskInfo = doCreateTask(pPlan->id.queryId, taskId, vgId, model, &pHandle->api);
|
||||
if (*pTaskInfo == NULL) {
|
||||
taosMemoryFree(sql);
|
||||
return terrno;
|
||||
|
@ -123,7 +123,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
|
|||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
||||
pAPI->metaReaderFn.initReader(&mr, pHandle->vnode, 0);
|
||||
int32_t code = pAPI->metaReaderFn.readerGetEntryGetUidCache(&mr, pScanNode->uid);
|
||||
int32_t code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, pScanNode->uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
|
||||
GET_TASKID(pTaskInfo));
|
||||
|
@ -144,7 +144,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
|
|||
tDecoderClear(&mr.coder);
|
||||
|
||||
tb_uid_t suid = mr.me.ctbEntry.suid;
|
||||
code = pAPI->metaReaderFn.readerGetEntryGetUidCache(&mr, suid);
|
||||
code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, suid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
return terrno;
|
||||
|
|
|
@ -532,7 +532,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
// 1. check if it is existed in meta cache
|
||||
if (pCache == NULL) {
|
||||
pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0);
|
||||
code = pHandle->api.metaReaderFn.readerGetEntryGetUidCache(&mr, pBlock->info.id.uid);
|
||||
code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed.
|
||||
if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||
|
@ -561,7 +561,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
|
||||
if (h == NULL) {
|
||||
pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0);
|
||||
code = pHandle->api.metaReaderFn.readerGetEntryGetUidCache(&mr, pBlock->info.id.uid);
|
||||
code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||
qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
|
||||
|
@ -693,7 +693,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
while (true) {
|
||||
code = pAPI->tsdReader.tsdReaderNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
|
||||
code = pAPI->tsdReader.tsdNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
|
||||
if (code) {
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
|
@ -975,6 +975,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
pInfo->sample.sampleRatio = pTableScanNode->ratio;
|
||||
pInfo->sample.seed = taosGetTimestampSec();
|
||||
|
||||
pInfo->readerAPI = pTaskInfo->storageAPI.tsdReader;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
||||
// blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||
|
@ -1103,7 +1104,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
|
|||
}
|
||||
|
||||
bool hasNext = false;
|
||||
code = pAPI->tsdReader.tsdReaderNextDataBlock(pReader, &hasNext);
|
||||
code = pAPI->tsdReader.tsdNextDataBlock(pReader, &hasNext);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
|
@ -2162,7 +2163,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
|||
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
bool hasNext = false;
|
||||
if (pInfo->dataReader) {
|
||||
code = pAPI->tsdReader.tsdReaderNextDataBlock(pInfo->dataReader, &hasNext);
|
||||
code = pAPI->tsdReader.tsdNextDataBlock(pInfo->dataReader, &hasNext);
|
||||
if (code) {
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader);
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
|
@ -2416,7 +2417,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
// set the extract column id to streamHandle
|
||||
pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds);
|
||||
SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo);
|
||||
code = pAPI->tqReaderFn.tqReaderSetTargetTableList(pInfo->tqReader, tableIdList);
|
||||
code = pAPI->tqReaderFn.tqReaderSetQueryTableList(pInfo->tqReader, tableIdList);
|
||||
if (code != 0) {
|
||||
taosArrayDestroy(tableIdList);
|
||||
goto _error;
|
||||
|
@ -2457,6 +2458,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pInfo->igExpired = pTableScanNode->igExpired;
|
||||
pInfo->twAggSup.maxTs = INT64_MIN;
|
||||
pInfo->pState = NULL;
|
||||
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||
pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;
|
||||
|
||||
// for stream
|
||||
if (pTaskInfo->streamInfo.pState) {
|
||||
|
@ -2674,7 +2677,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
qTrace("tsdb/read-table-data: %p, enter next reader", reader);
|
||||
|
||||
while (true) {
|
||||
code = pAPI->tsdReader.tsdReaderNextDataBlock(reader, &hasNext);
|
||||
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
|
||||
if (code != 0) {
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
||||
pInfo->base.dataReader = NULL;
|
||||
|
@ -3354,7 +3357,7 @@ static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCount
|
|||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
||||
// get dbname
|
||||
pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode, &db, &vgId);
|
||||
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL);
|
||||
SName sn = {0};
|
||||
tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
|
||||
tNameGetDbName(&sn, dbName);
|
||||
|
@ -3396,7 +3399,7 @@ static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountSca
|
|||
pRes->info.id.groupId = groupId;
|
||||
|
||||
int64_t dbTableCount = 0;
|
||||
pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode, &dbTableCount);
|
||||
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &dbTableCount, NULL);
|
||||
fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
|
@ -3411,20 +3414,21 @@ static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanO
|
|||
if (strlen(pSupp->stbNameFilter) != 0) {
|
||||
tb_uid_t uid = 0;
|
||||
pAPI->metaFn.getTableUidByName(pInfo->readHandle.vnode, pSupp->stbNameFilter, &uid);
|
||||
SMetaStbStats stats = {0};
|
||||
ASSERT(0);
|
||||
// metaGetStbStats(pInfo->readHandle.vnode, uid, &stats);
|
||||
int64_t ctbNum = stats.ctbNum;
|
||||
fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, ctbNum, pRes);
|
||||
|
||||
int64_t numOfChildTables = 0;
|
||||
pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, uid, &numOfChildTables);
|
||||
|
||||
fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, numOfChildTables, pRes);
|
||||
} else {
|
||||
int64_t tbNumVnode = 0;//metaGetTbNum(pInfo->readHandle.vnode);
|
||||
fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
|
||||
}
|
||||
} else {
|
||||
int64_t tbNumVnode = 0;//metaGetTbNum(pInfo->readHandle.vnode);
|
||||
pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode);
|
||||
int64_t tbNumVnode = 0;
|
||||
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &tbNumVnode, NULL);
|
||||
fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
|
||||
}
|
||||
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
|
|
|
@ -157,7 +157,7 @@ int32_t sysFilte__DbName(void* arg, SNode* pNode, SArray* result) {
|
|||
|
||||
const char* db = NULL;
|
||||
ASSERT(0);
|
||||
// pAPI->metaFn.storeGetBasicInfo(pVnode, &db, NULL);
|
||||
// pAPI->metaFn.getBasicInfo(pVnode, &db, NULL);
|
||||
|
||||
SName sn = {0};
|
||||
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
|
@ -185,7 +185,7 @@ int32_t sysFilte__VgroupId(void* arg, SNode* pNode, SArray* result) {
|
|||
|
||||
int64_t vgId = 0;
|
||||
ASSERT(0);
|
||||
// pAPI->metaFn.storeGetBasicInfo(pVnode, NULL, (int32_t*)&vgId);
|
||||
// pAPI->metaFn.getBasicInfo(pVnode, NULL, (int32_t*)&vgId);
|
||||
|
||||
SOperatorNode* pOper = (SOperatorNode*)pNode;
|
||||
SValueNode* pVal = (SValueNode*)pOper->pRight;
|
||||
|
@ -451,7 +451,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
|||
|
||||
const char* db = NULL;
|
||||
int32_t vgId = 0;
|
||||
pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode, &db, &vgId);
|
||||
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL);
|
||||
|
||||
SName sn = {0};
|
||||
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
|
@ -522,7 +522,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
|||
|
||||
int32_t ret = 0;
|
||||
if (pInfo->pCur == NULL) {
|
||||
pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.vnode);
|
||||
pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode);
|
||||
}
|
||||
|
||||
if (pInfo->pSchema == NULL) {
|
||||
|
@ -614,7 +614,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
|||
|
||||
blockDataDestroy(dataBlock);
|
||||
if (ret != 0) {
|
||||
pAPI->metaFn.closeMetaCursor(pInfo->pCur);
|
||||
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||
pInfo->pCur = NULL;
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
|
@ -642,7 +642,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
|
||||
const char* db = NULL;
|
||||
int32_t vgId = 0;
|
||||
pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode, &db, &vgId);
|
||||
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL);
|
||||
|
||||
SName sn = {0};
|
||||
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
|
@ -702,7 +702,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
|
||||
int32_t ret = 0;
|
||||
if (pInfo->pCur == NULL) {
|
||||
pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.vnode);
|
||||
pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode);
|
||||
}
|
||||
|
||||
bool blockFull = false;
|
||||
|
@ -722,7 +722,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno),
|
||||
GET_TASKID(pTaskInfo));
|
||||
pAPI->metaReaderFn.clearReader(&smrSuperTable);
|
||||
pAPI->metaFn.closeMetaCursor(pInfo->pCur);
|
||||
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||
pInfo->pCur = NULL;
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
@ -756,7 +756,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
|
||||
blockDataDestroy(dataBlock);
|
||||
if (ret != 0) {
|
||||
pAPI->metaFn.closeMetaCursor(pInfo->pCur);
|
||||
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||
pInfo->pCur = NULL;
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
|
@ -1113,7 +1113,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
|||
|
||||
const char* db = NULL;
|
||||
int32_t vgId = 0;
|
||||
pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode, &db, &vgId);
|
||||
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL);
|
||||
|
||||
SName sn = {0};
|
||||
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
|
@ -1292,7 +1292,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
|||
|
||||
SSysTableScanInfo* pInfo = pOperator->info;
|
||||
if (pInfo->pCur == NULL) {
|
||||
pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.vnode);
|
||||
pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode);
|
||||
}
|
||||
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
|
@ -1300,7 +1300,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
|||
|
||||
const char* db = NULL;
|
||||
int32_t vgId = 0;
|
||||
pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode, &db, &vgId);
|
||||
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL);
|
||||
|
||||
SName sn = {0};
|
||||
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
|
@ -1346,7 +1346,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
|||
qError("failed to get super table meta, cname:%s, suid:0x%" PRIx64 ", code:%s, %s", pInfo->pCur->mr.me.name,
|
||||
suid, tstrerror(terrno), GET_TASKID(pTaskInfo));
|
||||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
pAPI->metaFn.closeMetaCursor(pInfo->pCur);
|
||||
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||
pInfo->pCur = NULL;
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
@ -1456,8 +1456,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
|||
|
||||
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
|
||||
if (ret != 0) {
|
||||
pAPI->metaFn.closeMetaCursor(pInfo->pCur);
|
||||
pAPI->metaFn.closeMetaCursor(pInfo->pCur);
|
||||
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||
pInfo->pCur = NULL;
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
|
@ -1741,6 +1740,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
|||
|
||||
extractTbnameSlotId(pInfo, pScanNode);
|
||||
|
||||
pInfo->pAPI = &pTaskInfo->storageAPI;
|
||||
|
||||
pInfo->accountId = pScanPhyNode->accountId;
|
||||
pInfo->pUser = taosStrdup((void*)pUser);
|
||||
pInfo->sysInfo = pScanPhyNode->sysInfo;
|
||||
|
@ -1813,7 +1814,7 @@ void destroySysScanOperator(void* param) {
|
|||
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
|
||||
strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 ||
|
||||
strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
|
||||
pInfo->pAPI->metaFn.closeMetaCursor(pInfo->pCur);
|
||||
pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||
pInfo->pCur = NULL;
|
||||
}
|
||||
if (pInfo->pIdx) {
|
||||
|
|
|
@ -2804,6 +2804,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize,
|
||||
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark);
|
||||
pInfo->dataVersion = 0;
|
||||
pInfo->statestore = pTaskInfo->storageAPI.stateStore;
|
||||
|
||||
pOperator->operatorType = pPhyNode->type;
|
||||
pOperator->blocking = true;
|
||||
|
@ -4938,7 +4939,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
pInfo->pState = taosMemoryCalloc(1, sizeof(void));
|
||||
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
|
||||
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
|
||||
|
||||
|
@ -4986,6 +4987,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL,
|
||||
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
|
||||
|
||||
pInfo->statestore = pTaskInfo->storageAPI.stateStore;
|
||||
initIntervalDownStream(downstream, pPhyNode->type, pInfo);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -91,13 +91,14 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
|||
return winKeyCmprImpl(&pWin1->key, &pWin2->key);
|
||||
}
|
||||
|
||||
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
|
||||
qWarn("open stream state, %s", path);
|
||||
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) {
|
||||
qDebug("open stream state, %s", path);
|
||||
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
if (pState == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
|
||||
if (pState->pTdbState == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -105,29 +106,33 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SStreamTask* pStreamTask = pTask;
|
||||
char statePath[1024];
|
||||
if (!specPath) {
|
||||
sprintf(statePath, "%s/%d", path, pTask->id.taskId);
|
||||
sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId);
|
||||
} else {
|
||||
memset(statePath, 0, 1024);
|
||||
tstrncpy(statePath, path, 1024);
|
||||
}
|
||||
pState->taskId = pTask->id.taskId;
|
||||
pState->streamId = pTask->id.streamId;
|
||||
|
||||
pState->taskId = pStreamTask->id.taskId;
|
||||
pState->streamId = pStreamTask->id.streamId;
|
||||
|
||||
#ifdef USE_ROCKSDB
|
||||
// qWarn("open stream state1");
|
||||
taosAcquireRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
|
||||
int code = streamStateOpenBackend(pTask->pMeta->streamBackend, pState);
|
||||
SStreamMeta* pMeta = pStreamTask->pMeta;
|
||||
taosAcquireRef(pMeta->streamBackendId, pMeta->streamBackendRid);
|
||||
int code = streamStateOpenBackend(pMeta->streamBackend, pState);
|
||||
if (code == -1) {
|
||||
taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
|
||||
taosReleaseRef(pMeta->streamBackendId, pMeta->streamBackendRid);
|
||||
taosMemoryFree(pState);
|
||||
pState = NULL;
|
||||
}
|
||||
|
||||
pState->pTdbState->pOwner = pTask;
|
||||
pState->pFileState = NULL;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
|
||||
pState->parNameMap = tSimpleHashInit(1024, hashFn);
|
||||
|
||||
pState->parNameMap = tSimpleHashInit(1024, hashFn);
|
||||
return pState;
|
||||
|
||||
#else
|
||||
|
@ -449,7 +454,7 @@ int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pV
|
|||
#ifdef USE_ROCKSDB
|
||||
taosMemoryFree(pVal);
|
||||
#else
|
||||
streamFreeVal(pVal);
|
||||
streamStateFreeVal(pVal);
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
@ -700,7 +705,7 @@ void streamStateFreeCur(SStreamStateCur* pCur) {
|
|||
taosMemoryFree(pCur);
|
||||
}
|
||||
|
||||
void streamFreeVal(void* val) {
|
||||
void streamStateFreeVal(void* val) {
|
||||
#ifdef USE_ROCKSDB
|
||||
taosMemoryFree(val);
|
||||
#else
|
||||
|
|
Loading…
Reference in New Issue