add SSmaStat in tsdb to demonstrate window valid status
This commit is contained in:
parent
527e1b29c0
commit
aad01e0155
|
@ -50,12 +50,17 @@ typedef enum {
|
||||||
TSDB_CHECK_ITEM_MAX
|
TSDB_CHECK_ITEM_MAX
|
||||||
} ECheckItemType;
|
} ECheckItemType;
|
||||||
|
|
||||||
typedef enum { TD_ROW_DISCARD_UPDATE = 0, TD_ROW_OVERWRITE_UPDATE = 1, TD_ROW_PARTIAL_UPDATE = 2 } TDUpdateConfig;
|
typedef enum { TD_ROW_DISCARD_UPDATE = 0, TD_ROW_OVERWRITE_UPDATE = 1, TD_ROW_PARTIAL_UPDATE = 2} TDUpdateConfig;
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_STATIS_OK = 0, // statis part exist and load successfully
|
TSDB_STATIS_OK = 0, // statis part exist and load successfully
|
||||||
TSDB_STATIS_NONE = 1, // statis part not exist
|
TSDB_STATIS_NONE = 1, // statis part not exist
|
||||||
} ETsdbStatisStatus;
|
} ETsdbStatisStatus;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
TSDB_SMA_STAT_OK = 0, // ready to provide service
|
||||||
|
TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired
|
||||||
|
} ETsdbSmaStat;
|
||||||
|
|
||||||
extern char *qtypeStr[];
|
extern char *qtypeStr[];
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -52,6 +52,7 @@ struct STsdb {
|
||||||
STsdbFS * fs;
|
STsdbFS * fs;
|
||||||
SMeta * pMeta;
|
SMeta * pMeta;
|
||||||
STfs * pTfs;
|
STfs * pTfs;
|
||||||
|
SSmaStat * pSmaStat;
|
||||||
};
|
};
|
||||||
|
|
||||||
#define REPO_ID(r) ((r)->vgId)
|
#define REPO_ID(r) ((r)->vgId)
|
||||||
|
|
|
@ -42,7 +42,10 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STsdbFSMeta meta; // FS meta
|
STsdbFSMeta meta; // FS meta
|
||||||
SArray * df; // data file array
|
SArray * df; // data file array
|
||||||
SArray * smaf; // sma data file array
|
|
||||||
|
// SArray * v2f100.tsma.index_name
|
||||||
|
|
||||||
|
SArray * smaf; // sma data file array v2f1900.tsma.index_name
|
||||||
} SFSStatus;
|
} SFSStatus;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
#ifndef _TD_TSDB_SMA_H_
|
#ifndef _TD_TSDB_SMA_H_
|
||||||
#define _TD_TSDB_SMA_H_
|
#define _TD_TSDB_SMA_H_
|
||||||
|
|
||||||
|
typedef struct SSmaStat SSmaStat;
|
||||||
|
|
||||||
// insert/update interface
|
// insert/update interface
|
||||||
int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData);
|
int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData);
|
||||||
int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData);
|
int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData);
|
||||||
|
@ -26,13 +28,14 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData);
|
||||||
int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult);
|
int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult);
|
||||||
|
|
||||||
// management interface
|
// management interface
|
||||||
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void* result);
|
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg);
|
||||||
|
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result);
|
||||||
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin);
|
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin);
|
||||||
|
int32_t tsdbFreeSmaState(SSmaStat *pSmaStat);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// internal func
|
// internal func
|
||||||
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tsdbEncodeTSmaKey(uint64_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) {
|
static FORCE_INLINE int32_t tsdbEncodeTSmaKey(uint64_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
len += taosEncodeFixedU64(pData, tableUid);
|
len += taosEncodeFixedU64(pData, tableUid);
|
||||||
|
|
|
@ -923,6 +923,7 @@ SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) {
|
||||||
SMetaDB *pDB = pMeta->pDB;
|
SMetaDB *pDB = pMeta->pDB;
|
||||||
DBC * pCur = NULL;
|
DBC * pCur = NULL;
|
||||||
DBT pkey = {0}, pval = {0};
|
DBT pkey = {0}, pval = {0};
|
||||||
|
uint32_t mode = isDup ? DB_NEXT_DUP : DB_NEXT_NODUP;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
pUids = taosArrayInit(16, sizeof(tb_uid_t));
|
pUids = taosArrayInit(16, sizeof(tb_uid_t));
|
||||||
|
@ -941,13 +942,8 @@ SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) {
|
||||||
void *pBuf = NULL;
|
void *pBuf = NULL;
|
||||||
|
|
||||||
// TODO: lock?
|
// TODO: lock?
|
||||||
while (true) {
|
while ((ret = pCur->get(pCur, &pkey, &pval, mode)) == 0) {
|
||||||
ret = pCur->get(pCur, &pkey, &pval, isDup ? DB_NEXT_DUP : DB_NEXT_NODUP);
|
|
||||||
if(ret == 0) {
|
|
||||||
taosArrayPush(pUids, pkey.data);
|
taosArrayPush(pUids, pkey.data);
|
||||||
continue;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCur) {
|
if (pCur) {
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
|
@ -89,6 +89,7 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg,
|
||||||
static void tsdbFree(STsdb *pTsdb) {
|
static void tsdbFree(STsdb *pTsdb) {
|
||||||
if (pTsdb) {
|
if (pTsdb) {
|
||||||
tsdbFreeFS(pTsdb->fs);
|
tsdbFreeFS(pTsdb->fs);
|
||||||
|
tsdbFreeSmaState(pTsdb->pSmaStat);
|
||||||
tfree(pTsdb->path);
|
tfree(pTsdb->path);
|
||||||
free(pTsdb);
|
free(pTsdb);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,10 @@
|
||||||
|
|
||||||
#define SMA_STORE_SINGLE_BLOCKS // store SMA data by single block or multiple blocks
|
#define SMA_STORE_SINGLE_BLOCKS // store SMA data by single block or multiple blocks
|
||||||
|
|
||||||
|
#define SMA_STATE_HASH_SLOT 4
|
||||||
|
#define SMA_STATE_ITEM_HASH_SLOT 32
|
||||||
|
|
||||||
|
#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test
|
||||||
typedef enum {
|
typedef enum {
|
||||||
SMA_STORAGE_LEVEL_TSDB = 0, // store TSma in dir e.g. vnode${N}/tsdb/.tsma
|
SMA_STORAGE_LEVEL_TSDB = 0, // store TSma in dir e.g. vnode${N}/tsdb/.tsma
|
||||||
SMA_STORAGE_LEVEL_DFILESET = 1 // store TSma in file e.g. vnode${N}/tsdb/v2f1900.tsma.${sma_index_name}
|
SMA_STORAGE_LEVEL_DFILESET = 1 // store TSma in file e.g. vnode${N}/tsdb/v2f1900.tsma.${sma_index_name}
|
||||||
|
@ -48,6 +52,22 @@ typedef struct {
|
||||||
// TODO
|
// TODO
|
||||||
} STSmaReadH;
|
} STSmaReadH;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
/**
|
||||||
|
* @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service.
|
||||||
|
* - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
|
||||||
|
* without information about its previous state.
|
||||||
|
* - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from
|
||||||
|
* Streaming Module or TSDB local persistence.
|
||||||
|
*/
|
||||||
|
int8_t state; // ETsdbSmaStat
|
||||||
|
SHashObj *expiredWindows; // key: skey of time window, value: N/A
|
||||||
|
} SSmaStatItem;
|
||||||
|
|
||||||
|
struct SSmaStat {
|
||||||
|
SHashObj *smaStatItems; // key: indexName, value: SSmaStatItem
|
||||||
|
};
|
||||||
|
|
||||||
// declaration of static functions
|
// declaration of static functions
|
||||||
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData);
|
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData);
|
||||||
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData);
|
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData);
|
||||||
|
@ -64,6 +84,114 @@ static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param,
|
||||||
static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin);
|
static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin);
|
||||||
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin);
|
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin);
|
||||||
|
|
||||||
|
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
|
||||||
|
ASSERT(pSmaStat != NULL);
|
||||||
|
// TODO: lock and create when each put, or create during tsdbNew.
|
||||||
|
if (*pSmaStat == NULL) {
|
||||||
|
*pSmaStat = (SSmaStat *)calloc(1, sizeof(SSmaStat));
|
||||||
|
if (*pSmaStat == NULL) {
|
||||||
|
// TODO: unlock
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
(*pSmaStat)->smaStatItems =
|
||||||
|
taosHashInit(SMA_STATE_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
|
if ((*pSmaStat)->smaStatItems == NULL) {
|
||||||
|
tfree(*pSmaStat);
|
||||||
|
// TODO: unlock
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// TODO: unlock
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
|
||||||
|
SSmaStatItem *pItem = NULL;
|
||||||
|
|
||||||
|
pItem = (SSmaStatItem *)calloc(1, sizeof(SSmaStatItem));
|
||||||
|
if (pItem) {
|
||||||
|
pItem->state = state;
|
||||||
|
pItem->expiredWindows = taosHashInit(SMA_STATE_ITEM_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP),
|
||||||
|
true, HASH_ENTRY_LOCK);
|
||||||
|
if (!pItem->expiredWindows) {
|
||||||
|
tfree(pItem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return pItem;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbFreeSmaState(SSmaStat *pSmaStat) {
|
||||||
|
if (pSmaStat) {
|
||||||
|
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
|
||||||
|
SSmaStatItem *item = taosHashIterate(pSmaStat->smaStatItems, NULL);
|
||||||
|
while (item != NULL) {
|
||||||
|
taosHashCleanup(item->expiredWindows);
|
||||||
|
item = taosHashIterate(pSmaStat->smaStatItems, item);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashCleanup(pSmaStat->smaStatItems);
|
||||||
|
free(pSmaStat);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Update expired window according to msg from stream computing module.
|
||||||
|
*
|
||||||
|
* @param pTsdb
|
||||||
|
* @param msg
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
|
||||||
|
if (msg == NULL) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: decode the msg => start
|
||||||
|
const char * indexName = SMA_TEST_INDEX_NAME;
|
||||||
|
const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
|
||||||
|
TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE];
|
||||||
|
int64_t now = taosGetTimestampMs();
|
||||||
|
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
|
||||||
|
expiredWindows[i] = now + i;
|
||||||
|
}
|
||||||
|
// TODO: decode the msg <= end
|
||||||
|
|
||||||
|
SHashObj *pItemsHash = pTsdb->pSmaStat->smaStatItems;
|
||||||
|
|
||||||
|
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, indexName, strlen(indexName));
|
||||||
|
if (!pItem) {
|
||||||
|
pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state
|
||||||
|
if (!pItem) {
|
||||||
|
// Response to stream computing: OOM
|
||||||
|
// For query, if the indexName not found, the TSDB should tell query module to query raw TS data.
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosHashPut(pItemsHash, indexName, strnlen(indexName, TSDB_INDEX_NAME_LEN), &pItem, sizeof(pItem)) != 0) {
|
||||||
|
// If error occurs during put smaStatItem, free the resources of pItem
|
||||||
|
taosHashCleanup(pItem->expiredWindows);
|
||||||
|
free(pItem);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int8_t state = TSDB_SMA_STAT_EXPIRED;
|
||||||
|
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
|
||||||
|
if (taosHashPut(pItem->expiredWindows, &expiredWindows[i], sizeof(TSKEY), &state, sizeof(state)) != 0) {
|
||||||
|
// If error occurs during put expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would tell
|
||||||
|
// query module to query raw TS data.
|
||||||
|
taosHashCleanup(pItem->expiredWindows);
|
||||||
|
taosHashRemove(pItemsHash, indexName, sizeof(indexName));
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Judge the tSma storage level
|
* @brief Judge the tSma storage level
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in New Issue