drop tSma logic of tsdb
This commit is contained in:
parent
867ca85ab1
commit
6bd84f7fb2
|
@ -63,9 +63,11 @@ typedef enum {
|
|||
} ETsdbStatisStatus;
|
||||
|
||||
typedef enum {
|
||||
TSDB_SMA_STAT_OK = 0, // ready to provide service
|
||||
TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired
|
||||
} ETsdbSmaStat;
|
||||
TSDB_SMA_STAT_UNKNOWN = -1, // unknown
|
||||
TSDB_SMA_STAT_OK = 0, // ready to provide service
|
||||
TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired
|
||||
TSDB_SMA_STAT_DROPPED = 2, // sma dropped
|
||||
} ETsdbSmaStat; // bit operation
|
||||
|
||||
typedef enum {
|
||||
TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA
|
||||
|
|
|
@ -336,8 +336,9 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615)
|
||||
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616)
|
||||
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x0617)
|
||||
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0618)
|
||||
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0619)
|
||||
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0618)
|
||||
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0619)
|
||||
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x0620)
|
||||
|
||||
// query
|
||||
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
|
||||
|
|
|
@ -313,7 +313,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
|||
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
|
||||
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]);
|
||||
}
|
||||
if(pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) {
|
||||
if(pReq->rollup && pReq->stbCfg.pRSmaParam) {
|
||||
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
|
||||
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
|
||||
tlen += taosEncodeFixedI8(buf, param->delayUnit);
|
||||
|
@ -336,12 +336,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
|||
tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes);
|
||||
tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name);
|
||||
}
|
||||
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols);
|
||||
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
|
||||
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]);
|
||||
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nBSmaCols);
|
||||
for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) {
|
||||
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pBSmaCols[i]);
|
||||
}
|
||||
if(pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) {
|
||||
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
|
||||
if(pReq->rollup && pReq->ntbCfg.pRSmaParam) {
|
||||
SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
|
||||
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
|
||||
tlen += taosEncodeFixedI8(buf, param->delayUnit);
|
||||
tlen += taosEncodeFixedI8(buf, param->nFuncIds);
|
||||
|
@ -424,18 +424,18 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
|||
buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes);
|
||||
buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name);
|
||||
}
|
||||
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols));
|
||||
if(pReq->stbCfg.nBSmaCols > 0) {
|
||||
pReq->stbCfg.pBSmaCols = (col_id_t *)malloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t));
|
||||
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
|
||||
buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i);
|
||||
buf = taosDecodeFixedI16(buf, &(pReq->ntbCfg.nBSmaCols));
|
||||
if(pReq->ntbCfg.nBSmaCols > 0) {
|
||||
pReq->ntbCfg.pBSmaCols = (col_id_t *)malloc(pReq->ntbCfg.nBSmaCols * sizeof(col_id_t));
|
||||
for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) {
|
||||
buf = taosDecodeFixedI16(buf, pReq->ntbCfg.pBSmaCols + i);
|
||||
}
|
||||
} else {
|
||||
pReq->stbCfg.pBSmaCols = NULL;
|
||||
pReq->ntbCfg.pBSmaCols = NULL;
|
||||
}
|
||||
if(pReq->rollup) {
|
||||
pReq->stbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam));
|
||||
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
|
||||
pReq->ntbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam));
|
||||
SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
|
||||
buf = taosDecodeFixedU32(buf, (uint32_t*)¶m->xFilesFactor);
|
||||
buf = taosDecodeFixedI8(buf, ¶m->delayUnit);
|
||||
buf = taosDecodeFixedI8(buf, ¶m->nFuncIds);
|
||||
|
@ -448,7 +448,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
|||
}
|
||||
buf = taosDecodeFixedI64(buf, ¶m->delay);
|
||||
} else {
|
||||
pReq->stbCfg.pRSmaParam = NULL;
|
||||
pReq->ntbCfg.pRSmaParam = NULL;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
|
|
|
@ -51,7 +51,7 @@ int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg);
|
|||
int metaDropTable(SMeta *pMeta, tb_uid_t uid);
|
||||
int metaCommit(SMeta *pMeta);
|
||||
int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg);
|
||||
int32_t metaDropTSma(SMeta *pMeta, char *indexName);
|
||||
int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid);
|
||||
|
||||
// For Query
|
||||
STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
|
||||
|
|
|
@ -96,6 +96,7 @@ int tsdbCommit(STsdb *pTsdb);
|
|||
*/
|
||||
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg);
|
||||
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg);
|
||||
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
|
||||
|
||||
/**
|
||||
* @brief Insert RSma(Time-range-wise Rollup SMA) data.
|
||||
|
|
|
@ -34,7 +34,7 @@ void metaCloseDB(SMeta* pMeta);
|
|||
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg);
|
||||
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
|
||||
int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
|
||||
int metaRemoveSmaFromDb(SMeta* pMeta, const char* indexName);
|
||||
int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
|
||||
|
||||
// SMetaCache
|
||||
int metaOpenCache(SMeta* pMeta);
|
||||
|
|
|
@ -16,15 +16,15 @@
|
|||
#ifndef _TD_TSDB_SMA_H_
|
||||
#define _TD_TSDB_SMA_H_
|
||||
|
||||
typedef struct SSmaStat SSmaStat;
|
||||
typedef struct SSmaEnv SSmaEnv;
|
||||
typedef struct SSmaStat SSmaStat;
|
||||
typedef struct SSmaEnv SSmaEnv;
|
||||
|
||||
struct SSmaEnv {
|
||||
TdThreadRwlock lock;
|
||||
SDiskID did;
|
||||
TDBEnv dbEnv; // TODO: If it's better to put it in smaIndex level?
|
||||
char * path; // relative path
|
||||
SSmaStat * pStat;
|
||||
SDiskID did;
|
||||
TDBEnv dbEnv; // TODO: If it's better to put it in smaIndex level?
|
||||
char *path; // relative path
|
||||
SSmaStat *pStat;
|
||||
};
|
||||
|
||||
#define SMA_ENV_LOCK(env) ((env)->lock)
|
||||
|
@ -77,4 +77,6 @@ static FORCE_INLINE int tsdbUnLockSma(SSmaEnv *pEnv) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
#endif /* _TD_TSDB_SMA_H_ */
|
|
@ -259,7 +259,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int metaRemoveSmaFromDb(SMeta *pMeta, const char *indexName) {
|
||||
int metaRemoveSmaFromDb(SMeta *pMeta, int64_t indexUid) {
|
||||
// TODO
|
||||
#if 0
|
||||
DBT key = {0};
|
||||
|
|
|
@ -121,11 +121,11 @@ int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t metaDropTSma(SMeta *pMeta, char* indexName) {
|
||||
int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid) {
|
||||
// TODO: Validate the cfg
|
||||
// TODO: add atomicity
|
||||
|
||||
if (metaRemoveSmaFromDb(pMeta, indexName) < 0) {
|
||||
if (metaRemoveSmaFromDb(pMeta, indexUid) < 0) {
|
||||
// TODO: handle error
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ static const char *TSDB_SMA_DNAME[] = {
|
|||
#define SMA_STORAGE_TSDB_TIMES 10
|
||||
#define SMA_STORAGE_SPLIT_HOURS 24
|
||||
#define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8
|
||||
#define SMA_DROP_EXPIRED_TIME 10 // default is 10 seconds
|
||||
|
||||
#define SMA_STATE_HASH_SLOT 4
|
||||
#define SMA_STATE_ITEM_HASH_SLOT 32
|
||||
|
@ -60,10 +61,11 @@ typedef struct {
|
|||
typedef struct {
|
||||
/**
|
||||
* @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service.
|
||||
* - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
|
||||
* without information about its previous state.
|
||||
* - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from
|
||||
* Streaming Module or TSDB local persistence.
|
||||
* - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
|
||||
* without information about its previous state.
|
||||
* - TSDB_SMA_STAT_DROPPED: 1)sma dropped
|
||||
*/
|
||||
int8_t state; // ETsdbSmaStat
|
||||
SHashObj *expiredWindows; // key: skey of time window, value: N/A
|
||||
|
@ -80,6 +82,7 @@ struct SSmaStat {
|
|||
// expired window
|
||||
static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg);
|
||||
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
|
||||
static void * tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem);
|
||||
static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
|
||||
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did);
|
||||
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv);
|
||||
|
@ -109,7 +112,55 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[])
|
|||
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg);
|
||||
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg);
|
||||
|
||||
// mgmt interface
|
||||
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid);
|
||||
|
||||
// implementation
|
||||
static FORCE_INLINE int8_t tsdbSmaStat(SSmaStatItem *pStatItem) {
|
||||
if (pStatItem) {
|
||||
return atomic_load_8(&pStatItem->state);
|
||||
}
|
||||
return TSDB_SMA_STAT_UNKNOWN;
|
||||
}
|
||||
|
||||
static FORCE_INLINE bool tsdbSmaStatIsOK(SSmaStatItem *pStatItem, int8_t *state) {
|
||||
if(!pStatItem) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (state) {
|
||||
*state = atomic_load_8(&pStatItem->state);
|
||||
return *state == TSDB_SMA_STAT_OK;
|
||||
}
|
||||
return atomic_load_8(&pStatItem->state) == TSDB_SMA_STAT_OK;
|
||||
}
|
||||
|
||||
static FORCE_INLINE bool tsdbSmaStatIsExpired(SSmaStatItem *pStatItem) {
|
||||
return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_EXPIRED) : true;
|
||||
}
|
||||
|
||||
static FORCE_INLINE bool tsdbSmaStatIsDropped(SSmaStatItem *pStatItem) {
|
||||
return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_DROPPED) : true;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tsdbSmaStatSetOK(SSmaStatItem *pStatItem) {
|
||||
if (pStatItem) {
|
||||
atomic_store_8(&pStatItem->state, TSDB_SMA_STAT_OK);
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tsdbSmaStatSetExpired(SSmaStatItem *pStatItem) {
|
||||
if (pStatItem) {
|
||||
atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_EXPIRED);
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tsdbSmaStatSetDropped(SSmaStatItem *pStatItem) {
|
||||
if (pStatItem) {
|
||||
atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_DROPPED);
|
||||
}
|
||||
}
|
||||
|
||||
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%stsdb%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TD_DIRSEP,
|
||||
TSDB_SMA_DNAME[smaType]);
|
||||
|
@ -252,6 +303,16 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
|
|||
return pItem;
|
||||
}
|
||||
|
||||
static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
|
||||
if (pSmaStatItem != NULL) {
|
||||
tdDestroyTSma(pSmaStatItem->pSma);
|
||||
tfree(pSmaStatItem->pSma);
|
||||
taosHashCleanup(pSmaStatItem->expiredWindows);
|
||||
tfree(pSmaStatItem);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Release resources allocated for its member fields, not including itself.
|
||||
*
|
||||
|
@ -264,12 +325,7 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
|
|||
void *item = taosHashIterate(pSmaStat->smaStatItems, NULL);
|
||||
while (item != NULL) {
|
||||
SSmaStatItem *pItem = *(SSmaStatItem **)item;
|
||||
if (pItem != NULL) {
|
||||
tdDestroyTSma(pItem->pSma);
|
||||
tfree(pItem->pSma);
|
||||
taosHashCleanup(pItem->expiredWindows);
|
||||
tfree(pItem);
|
||||
}
|
||||
tsdbFreeSmaStatItem(pItem);
|
||||
item = taosHashIterate(pSmaStat->smaStatItems, item);
|
||||
}
|
||||
taosHashCleanup(pSmaStat->smaStatItems);
|
||||
|
@ -711,6 +767,148 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
|
|||
STsdbCfg * pCfg = REPO_CFG(pTsdb);
|
||||
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
|
||||
SSmaEnv * pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);
|
||||
int64_t indexUid = SMA_TEST_INDEX_UID;
|
||||
|
||||
|
||||
if (pEnv == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (pData->dataLen <= 0) {
|
||||
TASSERT(0);
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
STSmaWriteH tSmaH = {0};
|
||||
|
||||
if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData) != 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pTsdb->pTSmaEnv);
|
||||
SSmaStatItem *pItem = NULL;
|
||||
|
||||
tsdbRefSmaStat(pTsdb, pStat);
|
||||
|
||||
if (pStat && pStat->smaStatItems) {
|
||||
pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
|
||||
}
|
||||
if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL) || tsdbSmaStatIsDropped(pItem)) {
|
||||
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
|
||||
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
char rPath[TSDB_FILENAME_LEN] = {0};
|
||||
char aPath[TSDB_FILENAME_LEN] = {0};
|
||||
snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid);
|
||||
tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath);
|
||||
if (!taosCheckExistFile(aPath)) {
|
||||
if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rPath, SMA_ENV_DID(pEnv)) != TSDB_CODE_SUCCESS) {
|
||||
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 1: Judge the storage level and days
|
||||
int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
|
||||
int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
|
||||
int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
|
||||
|
||||
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
|
||||
// - Set and open the DFile or the B+Tree file
|
||||
// TODO: tsdbStartTSmaCommit();
|
||||
tsdbSetTSmaDataFile(&tSmaH, pData, indexUid, fid);
|
||||
if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) {
|
||||
tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
|
||||
tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
|
||||
tsdbDestroyTSmaWriteH(&tSmaH);
|
||||
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (tsdbInsertTSmaDataSection(&tSmaH, pData) != 0) {
|
||||
tsdbWarn("vgId:%d insert tSma data section failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
|
||||
tsdbDestroyTSmaWriteH(&tSmaH);
|
||||
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
// TODO:tsdbEndTSmaCommit();
|
||||
|
||||
// Step 3: reset the SSmaStat
|
||||
tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
|
||||
|
||||
tsdbDestroyTSmaWriteH(&tSmaH);
|
||||
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Drop tSma data and local cache
|
||||
* - insert/query reference
|
||||
* @param pTsdb
|
||||
* @param msg
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) {
|
||||
SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);
|
||||
|
||||
// clear local cache
|
||||
if (pEnv) {
|
||||
tsdbDebug("vgId:%d drop tSma local cache for %" PRIi64, REPO_ID(pTsdb), indexUid);
|
||||
|
||||
SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid));
|
||||
if ((pItem != NULL) || ((pItem = *(SSmaStatItem **)pItem) != NULL)) {
|
||||
if (tsdbSmaStatIsDropped(pItem)) {
|
||||
tsdbDebug("vgId:%d tSma stat is already dropped for %" PRIi64, REPO_ID(pTsdb), indexUid);
|
||||
return TSDB_CODE_TDB_INVALID_ACTION; // TODO: duplicate drop msg should be intercept by mnode
|
||||
}
|
||||
|
||||
tsdbWLockSma(pEnv);
|
||||
if (tsdbSmaStatIsDropped(pItem)) {
|
||||
tsdbUnLockSma(pEnv);
|
||||
tsdbDebug("vgId:%d tSma stat is already dropped for %" PRIi64, REPO_ID(pTsdb), indexUid);
|
||||
return TSDB_CODE_TDB_INVALID_ACTION; // TODO: duplicate drop msg should be intercept by mnode
|
||||
}
|
||||
tsdbSmaStatSetDropped(pItem);
|
||||
tsdbUnLockSma(pEnv);
|
||||
|
||||
int32_t nSleep = 0;
|
||||
while (true) {
|
||||
if (T_REF_VAL_GET(SMA_ENV_STAT(pEnv)) <= 0) {
|
||||
break;
|
||||
}
|
||||
taosSsleep(1);
|
||||
if (++nSleep > SMA_DROP_EXPIRED_TIME) {
|
||||
break;
|
||||
};
|
||||
}
|
||||
|
||||
tsdbFreeSmaStatItem(pItem);
|
||||
tsdbDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64 " in local cache", REPO_ID(pTsdb), indexUid);
|
||||
}
|
||||
}
|
||||
// clear sma data files
|
||||
// TODO:
|
||||
}
|
||||
|
||||
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) {
|
||||
STsdb *pTsdb = pSmaH->pTsdb;
|
||||
|
||||
char tSmaFile[TSDB_FILENAME_LEN] = {0};
|
||||
snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid);
|
||||
pSmaH->dFile.path = strdup(tSmaFile);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
|
||||
STsdbCfg * pCfg = REPO_CFG(pTsdb);
|
||||
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
|
||||
SSmaEnv * pEnv = atomic_load_ptr(&pTsdb->pRSmaEnv);
|
||||
|
||||
if (pEnv == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
|
@ -771,51 +969,6 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) {
|
||||
STsdb *pTsdb = pSmaH->pTsdb;
|
||||
|
||||
char tSmaFile[TSDB_FILENAME_LEN] = {0};
|
||||
snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid);
|
||||
pSmaH->dFile.path = strdup(tSmaFile);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
|
||||
STsdbCfg * pCfg = REPO_CFG(pTsdb);
|
||||
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
|
||||
STSmaWriteH tSmaH = {0};
|
||||
|
||||
tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData);
|
||||
|
||||
if (pData->dataLen <= 0) {
|
||||
TASSERT(0);
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
// Step 1: Judge the storage level
|
||||
int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
|
||||
int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile;
|
||||
|
||||
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
|
||||
// - Set and open the DFile or the B+Tree file
|
||||
|
||||
int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
|
||||
|
||||
// Save all the TSma data to one file
|
||||
// TODO: tsdbStartTSmaCommit();
|
||||
tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid);
|
||||
|
||||
tsdbInsertTSmaDataSection(&tSmaH, pData);
|
||||
// TODO:tsdbEndTSmaCommit();
|
||||
|
||||
// reset the SSmaStat
|
||||
tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pRSmaEnv), pData->indexUid, pData->skey);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief
|
||||
*
|
||||
|
@ -934,6 +1087,15 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
|
|||
#endif
|
||||
|
||||
#if 1
|
||||
int8_t smaStat = 0;
|
||||
if (!tsdbSmaStatIsOK(pItem, &smaStat)) { // TODO: multiple check for large scale sma query
|
||||
tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
|
||||
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
|
||||
tsdbDebug("vgId:%d getTSmaDataImpl failed from index %" PRIi64 " since %s %" PRIi8, REPO_ID(pTsdb), indexUid,
|
||||
tstrerror(terrno), smaStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY)) != NULL) {
|
||||
// TODO: mark this window as expired.
|
||||
tsdbDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, REPO_ID(pTsdb),
|
||||
|
@ -1086,6 +1248,20 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
|
|||
return code;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Get tSma data
|
||||
*
|
||||
* @param pTsdb
|
||||
* @param pData
|
||||
* @param indexUid
|
||||
* @param interval
|
||||
* @param intervalUnit
|
||||
* @param tableUid
|
||||
* @param colId
|
||||
* @param querySKey
|
||||
* @param nMaxResult
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit,
|
||||
tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -1094,4 +1270,19 @@ int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid,
|
|||
tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Drop tSma Data and caches
|
||||
*
|
||||
* @param pTsdb
|
||||
* @param msg
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if ((code = tsdbDropTSmaDataImpl(pTsdb, indexUid)) < 0) {
|
||||
tsdbWarn("vgId:%d drop tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
|
||||
}
|
||||
return code;
|
||||
}
|
|
@ -199,15 +199,23 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (metaDropTSma(pVnode->pMeta, vDropSmaReq.indexName) < 0) {
|
||||
// TODO: handle error
|
||||
return -1;
|
||||
}
|
||||
// TODO: send msg to stream computing to drop tSma
|
||||
// if ((send msg to stream computing) < 0) {
|
||||
// tdDestroyTSma(&vCreateSmaReq);
|
||||
// return -1;
|
||||
// }
|
||||
//
|
||||
|
||||
if (metaDropTSma(pVnode->pMeta, vDropSmaReq.indexUid) < 0) {
|
||||
// TODO: handle error
|
||||
return -1;
|
||||
}
|
||||
|
||||
if(tsdbDropTSmaData(pVnode->pTsdb, vDropSmaReq.indexUid) < 0) {
|
||||
// TODO: handle error
|
||||
return -1;
|
||||
}
|
||||
|
||||
// TODO: return directly or go on follow steps?
|
||||
} break;
|
||||
default:
|
||||
|
|
|
@ -272,8 +272,8 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
|
|||
taosArrayDestroy(pUids);
|
||||
|
||||
// resource release
|
||||
metaRemoveSmaFromDb(pMeta, smaIndexName1);
|
||||
metaRemoveSmaFromDb(pMeta, smaIndexName2);
|
||||
metaRemoveSmaFromDb(pMeta, indexUid1);
|
||||
metaRemoveSmaFromDb(pMeta, indexUid2);
|
||||
|
||||
tdDestroyTSma(&tSma);
|
||||
metaClose(pMeta);
|
||||
|
|
|
@ -332,8 +332,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECREATED, "Table re-created")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_SMA_STAT, "Invalid sma state")
|
||||
|
||||
// query
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")
|
||||
|
|
Loading…
Reference in New Issue