add SVCreateSmaReq/metaGetSmaTbUids
This commit is contained in:
parent
8f108bfb83
commit
8eaa5ec501
|
@ -1871,15 +1871,27 @@ typedef struct {
|
||||||
} STSma; // Time-range-wise SMA
|
} STSma; // Time-range-wise SMA
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t msgType; // 0 create, 1 recreate
|
int64_t ver; // use a general definition
|
||||||
STSma tSma;
|
STSma tSma;
|
||||||
STimeWindow window;
|
} SVCreateTSmaReq;
|
||||||
} SCreateTSmaMsg;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STimeWindow window;
|
int8_t type; // 0 status report, 1 update data
|
||||||
char indexName[TSDB_INDEX_NAME_LEN + 1];
|
char indexName[TSDB_INDEX_NAME_LEN + 1]; //
|
||||||
} SDropTSmaMsg;
|
STimeWindow windows;
|
||||||
|
} STSmaMsg;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t ver; // use a general definition
|
||||||
|
char indexName[TSDB_INDEX_NAME_LEN + 1];
|
||||||
|
} SVDropTSmaReq;
|
||||||
|
typedef struct {
|
||||||
|
} SVCreateTSmaRsp, SVDropTSmaRsp;
|
||||||
|
|
||||||
|
int32_t tSerializeSVCreateTSmaReq(void** buf, SVCreateTSmaReq* pReq);
|
||||||
|
void* tDeserializeSVCreateTSmaReq(void* buf, SVCreateTSmaReq* pReq);
|
||||||
|
int32_t tSerializeSVDropTSmaReq(void** buf, SVDropTSmaReq* pReq);
|
||||||
|
void* tDeserializeSVDropTSmaReq(void* buf, SVDropTSmaReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STimeWindow tsWindow; // [skey, ekey]
|
STimeWindow tsWindow; // [skey, ekey]
|
||||||
|
@ -1901,22 +1913,18 @@ static FORCE_INLINE void tdDestroySmaData(STSmaData* pSmaData) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RSma: Time-range-wise Rollup SMA
|
// RSma: Rollup SMA
|
||||||
// TODO: refactor when rSma grammar defined finally =>
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t interval;
|
int64_t interval;
|
||||||
int32_t retention; // unit: day
|
int32_t retention; // unit: day
|
||||||
uint16_t days; // unit: day
|
uint16_t days; // unit: day
|
||||||
int8_t intervalUnit;
|
int8_t intervalUnit;
|
||||||
} SSmaParams;
|
} SSmaParams;
|
||||||
// TODO: refactor when rSma grammar defined finally <=
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
// TODO: refactor to use the real schema =>
|
|
||||||
STSma tsma;
|
STSma tsma;
|
||||||
float xFilesFactor;
|
float xFilesFactor;
|
||||||
SArray* smaParams; // SSmaParams
|
SArray* smaParams; // SSmaParams
|
||||||
// TODO: refactor to use the real schema <=
|
|
||||||
} SRSma;
|
} SRSma;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -2390,3 +2390,36 @@ int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq)
|
||||||
tEndDecode(decoder);
|
tEndDecode(decoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
|
||||||
|
tlen += taosEncodeFixedI64(buf, pReq->ver);
|
||||||
|
tlen += tEncodeTSma(buf, &pReq->tSma);
|
||||||
|
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *tDeserializeSVCreateTSmaReq(void *buf, SVCreateTSmaReq *pReq) {
|
||||||
|
buf = taosDecodeFixedI64(buf, &(pReq->ver));
|
||||||
|
|
||||||
|
if ((buf = tDecodeTSma(buf, &pReq->tSma)) == NULL) {
|
||||||
|
tdDestroyTSma(&pReq->tSma);
|
||||||
|
}
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSVDropTSmaReq(void **buf, SVDropTSmaReq *pReq) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
|
||||||
|
tlen += taosEncodeFixedI64(buf, pReq->ver);
|
||||||
|
tlen += taosEncodeString(buf, pReq->indexName);
|
||||||
|
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) {
|
||||||
|
buf = taosDecodeFixedI64(buf, &(pReq->ver));
|
||||||
|
buf = taosDecodeStringTo(buf, pReq->indexName);
|
||||||
|
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
|
@ -40,24 +40,27 @@ typedef struct SMTbCursor SMTbCursor;
|
||||||
typedef struct SMCtbCursor SMCtbCursor;
|
typedef struct SMCtbCursor SMCtbCursor;
|
||||||
typedef struct SMSmaCursor SMSmaCursor;
|
typedef struct SMSmaCursor SMSmaCursor;
|
||||||
|
|
||||||
typedef SVCreateTbReq STbCfg;
|
typedef SVCreateTbReq STbCfg;
|
||||||
typedef STSma SSmaCfg;
|
typedef SVCreateTSmaReq SSmaCfg;
|
||||||
|
|
||||||
// SMeta operations
|
// SMeta operations
|
||||||
SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF);
|
SMeta * metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF);
|
||||||
void metaClose(SMeta *pMeta);
|
void metaClose(SMeta *pMeta);
|
||||||
void metaRemove(const char *path);
|
void metaRemove(const char *path);
|
||||||
int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg);
|
int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg);
|
||||||
int metaDropTable(SMeta *pMeta, tb_uid_t uid);
|
int metaDropTable(SMeta *pMeta, tb_uid_t uid);
|
||||||
int metaCommit(SMeta *pMeta);
|
int metaCommit(SMeta *pMeta);
|
||||||
|
int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg);
|
||||||
|
int32_t metaDropTSma(SMeta *pMeta, char *indexName);
|
||||||
|
|
||||||
// For Query
|
// For Query
|
||||||
STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
|
STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
|
||||||
STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
|
STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
|
||||||
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
||||||
STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
|
STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
|
||||||
SSmaCfg * metaGetSmaInfoByName(SMeta *pMeta, const char *indexName);
|
STSma * metaGetSmaInfoByName(SMeta *pMeta, const char *indexName);
|
||||||
STSmaWrapper * metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid);
|
STSmaWrapper * metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid);
|
||||||
|
SArray * metaGetSmaTbUids(SMeta *pMeta, bool isDup);
|
||||||
|
|
||||||
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
||||||
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
||||||
|
|
|
@ -33,7 +33,7 @@ int metaOpenDB(SMeta* pMeta);
|
||||||
void metaCloseDB(SMeta* pMeta);
|
void metaCloseDB(SMeta* pMeta);
|
||||||
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg);
|
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg);
|
||||||
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
|
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
|
||||||
int metaSaveSmaToDB(SMeta* pMeta, SSmaCfg* pTbCfg);
|
int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
|
||||||
int metaRemoveSmaFromDb(SMeta* pMeta, const char* indexName);
|
int metaRemoveSmaFromDb(SMeta* pMeta, const char* indexName);
|
||||||
|
|
||||||
// SMetaCache
|
// SMetaCache
|
||||||
|
|
|
@ -41,55 +41,4 @@ static FORCE_INLINE int32_t tsdbEncodeTSmaKey(uint64_t tableUid, col_id_t colId,
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int minFid;
|
|
||||||
int midFid;
|
|
||||||
int maxFid;
|
|
||||||
TSKEY minKey;
|
|
||||||
} SRtn;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
uint64_t uid;
|
|
||||||
int64_t offset;
|
|
||||||
int64_t size;
|
|
||||||
} SKVRecord;
|
|
||||||
|
|
||||||
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn);
|
|
||||||
|
|
||||||
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
|
|
||||||
if (key < 0) {
|
|
||||||
return (int)((key + 1) / tsTickPerDay[precision] / days - 1);
|
|
||||||
} else {
|
|
||||||
return (int)((key / tsTickPerDay[precision] / days));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
|
|
||||||
if (fid >= pRtn->maxFid) {
|
|
||||||
return 0;
|
|
||||||
} else if (fid >= pRtn->midFid) {
|
|
||||||
return 1;
|
|
||||||
} else if (fid >= pRtn->minFid) {
|
|
||||||
return 2;
|
|
||||||
} else {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
|
|
||||||
|
|
||||||
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
|
|
||||||
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
|
|
||||||
void *tsdbCommitData(STsdbRepo *pRepo);
|
|
||||||
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
|
|
||||||
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx);
|
|
||||||
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
|
|
||||||
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock,
|
|
||||||
bool isLast, bool isSuper, void **ppBuf, void **ppCBuf);
|
|
||||||
int tsdbApplyRtn(STsdbRepo *pRepo);
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /* _TD_TSDB_SMA_H_ */
|
#endif /* _TD_TSDB_SMA_H_ */
|
|
@ -226,7 +226,7 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaSaveSmaToDB(SMeta *pMeta, SSmaCfg *pSmaCfg) {
|
int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
|
||||||
char buf[512] = {0}; // TODO: may overflow
|
char buf[512] = {0}; // TODO: may overflow
|
||||||
void *pBuf = NULL;
|
void *pBuf = NULL;
|
||||||
DBT key1 = {0}, value1 = {0};
|
DBT key1 = {0}, value1 = {0};
|
||||||
|
@ -485,7 +485,7 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaSmaIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
|
static int metaSmaIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
|
||||||
SSmaCfg *pSmaCfg = (SSmaCfg *)(pValue->app_data);
|
STSma *pSmaCfg = (STSma *)(pValue->app_data);
|
||||||
|
|
||||||
memset(pSKey, 0, sizeof(*pSKey));
|
memset(pSKey, 0, sizeof(*pSKey));
|
||||||
pSKey->data = &(pSmaCfg->tableUid);
|
pSKey->data = &(pSmaCfg->tableUid);
|
||||||
|
@ -609,8 +609,8 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
|
||||||
return pTbCfg;
|
return pTbCfg;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSmaCfg *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) {
|
STSma *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) {
|
||||||
SSmaCfg *pCfg = NULL;
|
STSma * pCfg = NULL;
|
||||||
SMetaDB *pDB = pMeta->pDB;
|
SMetaDB *pDB = pMeta->pDB;
|
||||||
DBT key = {0};
|
DBT key = {0};
|
||||||
DBT value = {0};
|
DBT value = {0};
|
||||||
|
@ -629,7 +629,7 @@ SSmaCfg *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode
|
// Decode
|
||||||
pCfg = (SSmaCfg *)malloc(sizeof(SSmaCfg));
|
pCfg = (STSma *)malloc(sizeof(STSma));
|
||||||
if (pCfg == NULL) {
|
if (pCfg == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -885,8 +885,8 @@ STSmaWrapper *metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
DBT skey = {.data = &(pCur->uid)};
|
DBT skey = {.data = &(pCur->uid), .size = sizeof(pCur->uid)};
|
||||||
DBT pval = {.size = sizeof(pCur->uid)};
|
DBT pval = {0};
|
||||||
void *pBuf = NULL;
|
void *pBuf = NULL;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -914,10 +914,49 @@ STSmaWrapper *metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid) {
|
||||||
}
|
}
|
||||||
|
|
||||||
metaCloseSmaCurosr(pCur);
|
metaCloseSmaCurosr(pCur);
|
||||||
|
|
||||||
return pSW;
|
return pSW;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) {
|
||||||
|
SArray * pUids = NULL;
|
||||||
|
SMetaDB *pDB = pMeta->pDB;
|
||||||
|
DBC * pCur = NULL;
|
||||||
|
DBT pkey = {0}, pval = {0};
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
pUids = taosArrayInit(16, sizeof(tb_uid_t));
|
||||||
|
|
||||||
|
if (!pUids) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: lock?
|
||||||
|
ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &pCur, 0);
|
||||||
|
if (ret != 0) {
|
||||||
|
taosArrayDestroy(pUids);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *pBuf = NULL;
|
||||||
|
|
||||||
|
// TODO: lock?
|
||||||
|
while (true) {
|
||||||
|
ret = pCur->get(pCur, &pkey, &pval, isDup ? DB_NEXT_DUP : DB_NEXT_NODUP);
|
||||||
|
if(ret == 0) {
|
||||||
|
taosArrayPush(pUids, pkey.data);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pCur) {
|
||||||
|
pCur->close(pCur);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pUids;
|
||||||
|
}
|
||||||
|
|
||||||
static void metaDBWLock(SMetaDB *pDB) {
|
static void metaDBWLock(SMetaDB *pDB) {
|
||||||
#if IMPL_WITH_LOCK
|
#if IMPL_WITH_LOCK
|
||||||
pthread_rwlock_wrlock(&(pDB->rwlock));
|
pthread_rwlock_wrlock(&(pDB->rwlock));
|
||||||
|
|
|
@ -107,19 +107,27 @@ int metaRemoveTableFromIdx(SMeta *pMeta, tb_uid_t uid) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaCreateSma(SMeta *pMeta, SSmaCfg *pSmaCfg) {
|
int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg) {
|
||||||
// Validate the tbOptions
|
// TODO: Validate the cfg
|
||||||
// if (metaValidateTbCfg(pMeta, pTbCfg) < 0) {
|
// The table uid should exists and be super table or common table.
|
||||||
// // TODO: handle error
|
// Check other cfg value
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// TODO: add atomicity
|
// TODO: add atomicity
|
||||||
|
|
||||||
if (metaSaveSmaToDB(pMeta, pSmaCfg) < 0) {
|
if (metaSaveSmaToDB(pMeta, &pCfg->tSma) < 0) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t metaDropTSma(SMeta *pMeta, char* indexName) {
|
||||||
|
// TODO: Validate the cfg
|
||||||
|
// TODO: add atomicity
|
||||||
|
|
||||||
|
if (metaRemoveSmaFromDb(pMeta, indexName) < 0) {
|
||||||
|
// TODO: handle error
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
|
@ -50,3 +50,4 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -69,7 +69,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: maybe need to clear the requst struct
|
// TODO: maybe need to clear the request struct
|
||||||
free(vCreateTbReq.stbCfg.pSchema);
|
free(vCreateTbReq.stbCfg.pSchema);
|
||||||
free(vCreateTbReq.stbCfg.pTagSchema);
|
free(vCreateTbReq.stbCfg.pTagSchema);
|
||||||
free(vCreateTbReq.name);
|
free(vCreateTbReq.name);
|
||||||
|
@ -133,13 +133,44 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
|
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
|
||||||
// 1. tdCreateSmaMeta(pVnode->pMeta,...);
|
SSmaCfg vCreateSmaReq = {0};
|
||||||
// 2. tdCreateSmaDataInit();
|
if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) {
|
||||||
// 3. tdCreateSmaData
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (metaCreateTSma(pVnode->pMeta, &vCreateSmaReq) < 0) {
|
||||||
|
// TODO: handle error
|
||||||
|
tdDestroyTSma(&vCreateSmaReq.tSma);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
// TODO: send msg to stream computing to create tSma
|
||||||
|
// if ((send msg to stream computing) < 0) {
|
||||||
|
// tdDestroyTSma(&vCreateSmaReq);
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
tdDestroyTSma(&vCreateSmaReq.tSma);
|
||||||
|
// TODO: return directly or go on follow steps?
|
||||||
} break;
|
} break;
|
||||||
case TDMT_VND_CANCEL_SMA: { // timeRangeSMA
|
case TDMT_VND_CANCEL_SMA: { // timeRangeSMA
|
||||||
} break;
|
} break;
|
||||||
case TDMT_VND_DROP_SMA: { // timeRangeSMA
|
case TDMT_VND_DROP_SMA: { // timeRangeSMA
|
||||||
|
SVDropTSmaReq vDropSmaReq = {0};
|
||||||
|
if (tDeserializeSVDropTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vDropSmaReq) == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (metaDropTSma(pVnode->pMeta, vDropSmaReq.indexName) < 0) {
|
||||||
|
// TODO: handle error
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
// TODO: send msg to stream computing to drop tSma
|
||||||
|
// if ((send msg to stream computing) < 0) {
|
||||||
|
// tdDestroyTSma(&vCreateSmaReq);
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
// TODO: return directly or go on follow steps?
|
||||||
} break;
|
} break;
|
||||||
default:
|
default:
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
|
|
@ -103,6 +103,7 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
|
||||||
const char *smaIndexName2 = "sma_index_test_2";
|
const char *smaIndexName2 = "sma_index_test_2";
|
||||||
const char *smaTestDir = "./smaTest";
|
const char *smaTestDir = "./smaTest";
|
||||||
const uint64_t tbUid = 1234567890;
|
const uint64_t tbUid = 1234567890;
|
||||||
|
const uint32_t nCntTSma = 2;
|
||||||
// encode
|
// encode
|
||||||
STSma tSma = {0};
|
STSma tSma = {0};
|
||||||
tSma.version = 0;
|
tSma.version = 0;
|
||||||
|
@ -125,7 +126,7 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SMeta * pMeta = NULL;
|
SMeta * pMeta = NULL;
|
||||||
SSmaCfg * pSmaCfg = &tSma;
|
STSma * pSmaCfg = &tSma;
|
||||||
const SMetaCfg *pMetaCfg = &defaultMetaOptions;
|
const SMetaCfg *pMetaCfg = &defaultMetaOptions;
|
||||||
|
|
||||||
taosRemoveDir(smaTestDir);
|
taosRemoveDir(smaTestDir);
|
||||||
|
@ -146,14 +147,14 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
|
||||||
metaSaveSmaToDB(pMeta, pSmaCfg);
|
metaSaveSmaToDB(pMeta, pSmaCfg);
|
||||||
|
|
||||||
// get value by indexName
|
// get value by indexName
|
||||||
SSmaCfg *qSmaCfg = NULL;
|
STSma *qSmaCfg = NULL;
|
||||||
qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName1);
|
qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName1);
|
||||||
assert(qSmaCfg != NULL);
|
assert(qSmaCfg != NULL);
|
||||||
printf("name1 = %s\n", qSmaCfg->indexName);
|
printf("name1 = %s\n", qSmaCfg->indexName);
|
||||||
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1);
|
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1);
|
||||||
EXPECT_EQ(qSmaCfg->tableUid, tSma.tableUid);
|
EXPECT_EQ(qSmaCfg->tableUid, tSma.tableUid);
|
||||||
tdDestroyTSma(qSmaCfg);
|
tdDestroyTSma(qSmaCfg);
|
||||||
free(qSmaCfg);
|
tfree(qSmaCfg);
|
||||||
|
|
||||||
qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName2);
|
qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName2);
|
||||||
assert(qSmaCfg != NULL);
|
assert(qSmaCfg != NULL);
|
||||||
|
@ -161,7 +162,7 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
|
||||||
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2);
|
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2);
|
||||||
EXPECT_EQ(qSmaCfg->interval, tSma.interval);
|
EXPECT_EQ(qSmaCfg->interval, tSma.interval);
|
||||||
tdDestroyTSma(qSmaCfg);
|
tdDestroyTSma(qSmaCfg);
|
||||||
free(qSmaCfg);
|
tfree(qSmaCfg);
|
||||||
|
|
||||||
// get index name by table uid
|
// get index name by table uid
|
||||||
SMSmaCursor *pSmaCur = metaOpenSmaCursor(pMeta, tbUid);
|
SMSmaCursor *pSmaCur = metaOpenSmaCursor(pMeta, tbUid);
|
||||||
|
@ -175,17 +176,30 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
|
||||||
printf("indexName = %s\n", indexName);
|
printf("indexName = %s\n", indexName);
|
||||||
++indexCnt;
|
++indexCnt;
|
||||||
}
|
}
|
||||||
EXPECT_EQ(indexCnt, 2);
|
EXPECT_EQ(indexCnt, nCntTSma);
|
||||||
metaCloseSmaCurosr(pSmaCur);
|
metaCloseSmaCurosr(pSmaCur);
|
||||||
|
|
||||||
// get wrapper by table uid
|
// get wrapper by table uid
|
||||||
STSmaWrapper *pSW = metaGetSmaInfoByUid(pMeta, tbUid);
|
STSmaWrapper *pSW = metaGetSmaInfoByUid(pMeta, tbUid);
|
||||||
assert(pSW != NULL);
|
assert(pSW != NULL);
|
||||||
EXPECT_EQ(pSW->number, 2);
|
EXPECT_EQ(pSW->number, nCntTSma);
|
||||||
EXPECT_STRCASEEQ(pSW->tSma->indexName, smaIndexName1);
|
EXPECT_STRCASEEQ(pSW->tSma->indexName, smaIndexName1);
|
||||||
EXPECT_EQ(pSW->tSma->tableUid, tSma.tableUid);
|
EXPECT_EQ(pSW->tSma->tableUid, tSma.tableUid);
|
||||||
EXPECT_STRCASEEQ((pSW->tSma + 1)->indexName, smaIndexName2);
|
EXPECT_STRCASEEQ((pSW->tSma + 1)->indexName, smaIndexName2);
|
||||||
EXPECT_EQ((pSW->tSma + 1)->tableUid, tSma.tableUid);
|
EXPECT_EQ((pSW->tSma + 1)->tableUid, tSma.tableUid);
|
||||||
|
|
||||||
|
tdDestroyTSmaWrapper(pSW);
|
||||||
|
tfree(pSW);
|
||||||
|
|
||||||
|
// get all sma table uids
|
||||||
|
SArray *pUids = metaGetSmaTbUids(pMeta, false);
|
||||||
|
assert(pUids != NULL);
|
||||||
|
for (uint32_t i = 0; i < taosArrayGetSize(pUids); ++i) {
|
||||||
|
printf("metaGetSmaTbUids: uid[%" PRIu32 "] = %" PRIi64 "\n", i, *(tb_uid_t *)taosArrayGet(pUids, i));
|
||||||
|
// printf("metaGetSmaTbUids: index[%" PRIu32 "] = %s", i, (char *)taosArrayGet(pUids, i));
|
||||||
|
}
|
||||||
|
EXPECT_EQ(taosArrayGetSize(pUids), 1);
|
||||||
|
taosArrayDestroy(pUids);
|
||||||
|
|
||||||
// resource release
|
// resource release
|
||||||
metaRemoveSmaFromDb(pMeta, smaIndexName1);
|
metaRemoveSmaFromDb(pMeta, smaIndexName1);
|
||||||
|
|
Loading…
Reference in New Issue