diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 40be3f1299..80fcde0f70 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -35,21 +35,13 @@ extern "C" { #endif -// types -typedef struct SVnode SVnode; -typedef struct SMetaCfg SMetaCfg; // todo: remove -typedef struct STsdbCfg STsdbCfg; // todo: remove -typedef struct STqCfg STqCfg; // todo: remove -typedef struct SVnodeCfg SVnodeCfg; -typedef struct STqReadHandle STqReadHandle; -typedef struct SMTbCursor SMTbCursor; // todo: remove -typedef struct SMCtbCursor SMCtbCursor; // todo: remove -typedef struct SMSmaCursor SMSmaCursor; // todo: remove -typedef void *tsdbReaderT; -typedef struct STsdbQueryCond STsdbQueryCond; -typedef struct SDataStatis SDataStatis; - // vnode +typedef struct SVnode SVnode; +typedef struct SMetaCfg SMetaCfg; // todo: remove +typedef struct STsdbCfg STsdbCfg; // todo: remove +typedef struct STqCfg STqCfg; // todo: remove +typedef struct SVnodeCfg SVnodeCfg; + int vnodeInit(); void vnodeCleanup(); SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); @@ -65,12 +57,59 @@ int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); +int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); // meta -typedef struct SMeta SMeta; // todo: remove +typedef struct SMeta SMeta; // todo: remove +typedef struct SMTbCursor SMTbCursor; // todo: remove +typedef struct SMCtbCursor SMCtbCursor; // todo: remove +typedef struct SMSmaCursor SMSmaCursor; // todo: remove + +#define META_SUPER_TABLE TD_SUPER_TABLE +#define META_CHILD_TABLE TD_CHILD_TABLE +#define META_NORMAL_TABLE TD_NORMAL_TABLE + +typedef SVCreateTbReq STbCfg; +typedef SVCreateTSmaReq SSmaCfg; + +SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF); +void metaClose(SMeta *pMeta); +void metaRemove(const char *path); +int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg); +int metaDropTable(SMeta *pMeta, tb_uid_t uid); +int metaCommit(SMeta *pMeta); +int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg); +int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid); +STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); +STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); +SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); +STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); +void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode); +STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid); +SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup); +int metaGetTbNum(SMeta *pMeta); +SMTbCursor *metaOpenTbCursor(SMeta *pMeta); +void metaCloseTbCursor(SMTbCursor *pTbCur); +char *metaTbCursorNext(SMTbCursor *pTbCur); +SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid); +void metaCloseCtbCurosr(SMCtbCursor *pCtbCur); +tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur); + +SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid); +void metaCloseSmaCursor(SMSmaCursor *pSmaCur); +int64_t metaSmaCursorNext(SMSmaCursor *pSmaCur); // tsdb -typedef struct STsdb STsdb; +typedef struct STsdb STsdb; +typedef struct SDataStatis SDataStatis; +typedef struct STsdbQueryCond STsdbQueryCond; +typedef void *tsdbReaderT; + +#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 +#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 +#define BLOCK_LOAD_TABLE_RR_ORDER 3 +#define TABLE_TID(t) (t)->tid +#define TABLE_UID(t) (t)->uid tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, uint64_t taskId); @@ -92,6 +131,23 @@ int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STab void tsdbCleanupReadHandle(tsdbReaderT queryHandle); // tq +enum { + TQ_STREAM_TOKEN__DATA = 1, + TQ_STREAM_TOKEN__WATERMARK, + TQ_STREAM_TOKEN__CHECKPOINT, +}; + +typedef struct STqReadHandle STqReadHandle; + +STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta); + +void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList); +int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); +int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); +int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); +bool tqNextDataBlock(STqReadHandle *pHandle); +int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo); +SArray *tqRetrieveDataBlock(STqReadHandle *pHandle); // need to remove int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg); @@ -159,16 +215,9 @@ struct STqReadHandle { // ---------------------------- OLD ---------------------------- typedef struct SMgmtWrapper SMgmtWrapper; -#define META_SUPER_TABLE TD_SUPER_TABLE -#define META_CHILD_TABLE TD_CHILD_TABLE -#define META_NORMAL_TABLE TD_NORMAL_TABLE - // Types exported -typedef SVCreateTbReq STbCfg; -typedef SVCreateTSmaReq SSmaCfg; - -typedef struct SDataStatis { +struct SDataStatis { int16_t colId; int16_t maxIndex; int16_t minIndex; @@ -176,7 +225,7 @@ typedef struct SDataStatis { int64_t sum; int64_t max; int64_t min; -} SDataStatis; +}; struct STsdbQueryCond { STimeWindow twindow; @@ -198,24 +247,8 @@ typedef struct STable { STSchema *pSchema; } STable; -#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 -#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 -#define BLOCK_LOAD_TABLE_RR_ORDER 3 - -#define TABLE_TID(t) (t)->tid -#define TABLE_UID(t) (t)->uid - -/* ------------------------ SVnode ------------------------ */ -int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); - /* ------------------------ FOR COMPILE ------------------------ */ -enum { - TQ_STREAM_TOKEN__DATA = 1, - TQ_STREAM_TOKEN__WATERMARK, - TQ_STREAM_TOKEN__CHECKPOINT, -}; - typedef struct { int8_t type; int8_t reserved[7]; @@ -226,85 +259,7 @@ typedef struct { }; } STqStreamToken; -STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta); - -static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList) { - pReadHandle->pColIdList = pColIdList; -} - -// static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, int64_t tbUid) { -// pHandle->tbUid = tbUid; -//} - -static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) { - if (pHandle->tbIdHash) { - taosHashClear(pHandle->tbIdHash); - } - - pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - if (pHandle->tbIdHash == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { - int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i); - taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); - } - - return 0; -} - -static FORCE_INLINE int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) { - if (pHandle->tbIdHash == NULL) { - pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - if (pHandle->tbIdHash == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - } - - for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { - int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i); - taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); - } - - return 0; -} - -int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); -bool tqNextDataBlock(STqReadHandle *pHandle); -int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo); -// return SArray -SArray *tqRetrieveDataBlock(STqReadHandle *pHandle); - // meta.h -SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF); -void metaClose(SMeta *pMeta); -void metaRemove(const char *path); -int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg); -int metaDropTable(SMeta *pMeta, tb_uid_t uid); -int metaCommit(SMeta *pMeta); -int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg); -int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid); -STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); -STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); -SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); -STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); -void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode); -STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid); -SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup); -int metaGetTbNum(SMeta *pMeta); -SMTbCursor *metaOpenTbCursor(SMeta *pMeta); -void metaCloseTbCursor(SMTbCursor *pTbCur); -char *metaTbCursorNext(SMTbCursor *pTbCur); -SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid); -void metaCloseCtbCurosr(SMCtbCursor *pCtbCur); -tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur); - -SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid); -void metaCloseSmaCursor(SMSmaCursor *pSmaCur); -int64_t metaSmaCursorNext(SMSmaCursor *pSmaCur); // Options void metaOptionsInit(SMetaCfg *pMetaCfg); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index c69f8ce09a..21dc6e35e8 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -88,7 +88,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList); pBlockInfo->rows = pHandle->pBlock->numOfRows; -// pBlockInfo->uid = pHandle->pBlock->uid; // the uid can not be assigned to pBlockData. + // pBlockInfo->uid = pHandle->pBlock->uid; // the uid can not be assigned to pBlockData. return 0; } @@ -177,3 +177,41 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { } return pArray; } + +void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; } + +int tqReadHandleSetTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { + if (pHandle->tbIdHash) { + taosHashClear(pHandle->tbIdHash); + } + + pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + if (pHandle->tbIdHash == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { + int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i); + taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); + } + + return 0; +} + +int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { + if (pHandle->tbIdHash == NULL) { + pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + if (pHandle->tbIdHash == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } + + for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { + int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i); + taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); + } + + return 0; +}