refact vnode
This commit is contained in:
parent
b932b5bca6
commit
272d0ec74a
|
@ -35,21 +35,13 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
// types
|
||||
// vnode
|
||||
typedef struct SVnode SVnode;
|
||||
typedef struct SMetaCfg SMetaCfg; // todo: remove
|
||||
typedef struct STsdbCfg STsdbCfg; // todo: remove
|
||||
typedef struct STqCfg STqCfg; // todo: remove
|
||||
typedef struct SVnodeCfg SVnodeCfg;
|
||||
typedef struct STqReadHandle STqReadHandle;
|
||||
typedef struct SMTbCursor SMTbCursor; // todo: remove
|
||||
typedef struct SMCtbCursor SMCtbCursor; // todo: remove
|
||||
typedef struct SMSmaCursor SMSmaCursor; // todo: remove
|
||||
typedef void *tsdbReaderT;
|
||||
typedef struct STsdbQueryCond STsdbQueryCond;
|
||||
typedef struct SDataStatis SDataStatis;
|
||||
|
||||
// vnode
|
||||
int vnodeInit();
|
||||
void vnodeCleanup();
|
||||
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg);
|
||||
|
@ -65,12 +57,59 @@ int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
|
|||
int32_t vnodeCompact(SVnode *pVnode);
|
||||
int32_t vnodeSync(SVnode *pVnode);
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
|
||||
|
||||
// meta
|
||||
typedef struct SMeta SMeta; // todo: remove
|
||||
typedef struct SMTbCursor SMTbCursor; // todo: remove
|
||||
typedef struct SMCtbCursor SMCtbCursor; // todo: remove
|
||||
typedef struct SMSmaCursor SMSmaCursor; // todo: remove
|
||||
|
||||
#define META_SUPER_TABLE TD_SUPER_TABLE
|
||||
#define META_CHILD_TABLE TD_CHILD_TABLE
|
||||
#define META_NORMAL_TABLE TD_NORMAL_TABLE
|
||||
|
||||
typedef SVCreateTbReq STbCfg;
|
||||
typedef SVCreateTSmaReq SSmaCfg;
|
||||
|
||||
SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF);
|
||||
void metaClose(SMeta *pMeta);
|
||||
void metaRemove(const char *path);
|
||||
int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg);
|
||||
int metaDropTable(SMeta *pMeta, tb_uid_t uid);
|
||||
int metaCommit(SMeta *pMeta);
|
||||
int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg);
|
||||
int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid);
|
||||
STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
|
||||
STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
|
||||
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
||||
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
|
||||
void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode);
|
||||
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid);
|
||||
SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup);
|
||||
int metaGetTbNum(SMeta *pMeta);
|
||||
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
||||
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
||||
char *metaTbCursorNext(SMTbCursor *pTbCur);
|
||||
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid);
|
||||
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur);
|
||||
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur);
|
||||
|
||||
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid);
|
||||
void metaCloseSmaCursor(SMSmaCursor *pSmaCur);
|
||||
int64_t metaSmaCursorNext(SMSmaCursor *pSmaCur);
|
||||
|
||||
// tsdb
|
||||
typedef struct STsdb STsdb;
|
||||
typedef struct SDataStatis SDataStatis;
|
||||
typedef struct STsdbQueryCond STsdbQueryCond;
|
||||
typedef void *tsdbReaderT;
|
||||
|
||||
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
|
||||
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
||||
#define BLOCK_LOAD_TABLE_RR_ORDER 3
|
||||
#define TABLE_TID(t) (t)->tid
|
||||
#define TABLE_UID(t) (t)->uid
|
||||
|
||||
tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
|
||||
uint64_t taskId);
|
||||
|
@ -92,6 +131,23 @@ int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STab
|
|||
void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
|
||||
|
||||
// tq
|
||||
enum {
|
||||
TQ_STREAM_TOKEN__DATA = 1,
|
||||
TQ_STREAM_TOKEN__WATERMARK,
|
||||
TQ_STREAM_TOKEN__CHECKPOINT,
|
||||
};
|
||||
|
||||
typedef struct STqReadHandle STqReadHandle;
|
||||
|
||||
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
|
||||
|
||||
void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList);
|
||||
int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
|
||||
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
|
||||
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
|
||||
bool tqNextDataBlock(STqReadHandle *pHandle);
|
||||
int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo);
|
||||
SArray *tqRetrieveDataBlock(STqReadHandle *pHandle);
|
||||
|
||||
// need to remove
|
||||
int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg);
|
||||
|
@ -159,16 +215,9 @@ struct STqReadHandle {
|
|||
// ---------------------------- OLD ----------------------------
|
||||
typedef struct SMgmtWrapper SMgmtWrapper;
|
||||
|
||||
#define META_SUPER_TABLE TD_SUPER_TABLE
|
||||
#define META_CHILD_TABLE TD_CHILD_TABLE
|
||||
#define META_NORMAL_TABLE TD_NORMAL_TABLE
|
||||
|
||||
// Types exported
|
||||
|
||||
typedef SVCreateTbReq STbCfg;
|
||||
typedef SVCreateTSmaReq SSmaCfg;
|
||||
|
||||
typedef struct SDataStatis {
|
||||
struct SDataStatis {
|
||||
int16_t colId;
|
||||
int16_t maxIndex;
|
||||
int16_t minIndex;
|
||||
|
@ -176,7 +225,7 @@ typedef struct SDataStatis {
|
|||
int64_t sum;
|
||||
int64_t max;
|
||||
int64_t min;
|
||||
} SDataStatis;
|
||||
};
|
||||
|
||||
struct STsdbQueryCond {
|
||||
STimeWindow twindow;
|
||||
|
@ -198,24 +247,8 @@ typedef struct STable {
|
|||
STSchema *pSchema;
|
||||
} STable;
|
||||
|
||||
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
|
||||
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
||||
#define BLOCK_LOAD_TABLE_RR_ORDER 3
|
||||
|
||||
#define TABLE_TID(t) (t)->tid
|
||||
#define TABLE_UID(t) (t)->uid
|
||||
|
||||
/* ------------------------ SVnode ------------------------ */
|
||||
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
|
||||
|
||||
/* ------------------------ FOR COMPILE ------------------------ */
|
||||
|
||||
enum {
|
||||
TQ_STREAM_TOKEN__DATA = 1,
|
||||
TQ_STREAM_TOKEN__WATERMARK,
|
||||
TQ_STREAM_TOKEN__CHECKPOINT,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
int8_t type;
|
||||
int8_t reserved[7];
|
||||
|
@ -226,85 +259,7 @@ typedef struct {
|
|||
};
|
||||
} STqStreamToken;
|
||||
|
||||
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
|
||||
|
||||
static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList) {
|
||||
pReadHandle->pColIdList = pColIdList;
|
||||
}
|
||||
|
||||
// static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, int64_t tbUid) {
|
||||
// pHandle->tbUid = tbUid;
|
||||
//}
|
||||
|
||||
static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) {
|
||||
if (pHandle->tbIdHash) {
|
||||
taosHashClear(pHandle->tbIdHash);
|
||||
}
|
||||
|
||||
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i);
|
||||
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) {
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i);
|
||||
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
|
||||
bool tqNextDataBlock(STqReadHandle *pHandle);
|
||||
int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo);
|
||||
// return SArray<SColumnInfoData>
|
||||
SArray *tqRetrieveDataBlock(STqReadHandle *pHandle);
|
||||
|
||||
// meta.h
|
||||
SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF);
|
||||
void metaClose(SMeta *pMeta);
|
||||
void metaRemove(const char *path);
|
||||
int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg);
|
||||
int metaDropTable(SMeta *pMeta, tb_uid_t uid);
|
||||
int metaCommit(SMeta *pMeta);
|
||||
int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg);
|
||||
int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid);
|
||||
STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
|
||||
STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
|
||||
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
||||
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
|
||||
void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode);
|
||||
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid);
|
||||
SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup);
|
||||
int metaGetTbNum(SMeta *pMeta);
|
||||
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
||||
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
||||
char *metaTbCursorNext(SMTbCursor *pTbCur);
|
||||
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid);
|
||||
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur);
|
||||
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur);
|
||||
|
||||
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid);
|
||||
void metaCloseSmaCursor(SMSmaCursor *pSmaCur);
|
||||
int64_t metaSmaCursorNext(SMSmaCursor *pSmaCur);
|
||||
|
||||
// Options
|
||||
void metaOptionsInit(SMetaCfg *pMetaCfg);
|
||||
|
|
|
@ -177,3 +177,41 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
|||
}
|
||||
return pArray;
|
||||
}
|
||||
|
||||
void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
|
||||
|
||||
int tqReadHandleSetTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
|
||||
if (pHandle->tbIdHash) {
|
||||
taosHashClear(pHandle->tbIdHash);
|
||||
}
|
||||
|
||||
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
|
||||
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
|
||||
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue