use dataDir to init sma env
This commit is contained in:
parent
5705b76e3b
commit
b44ef3c81e
|
@ -198,6 +198,16 @@ void tfsBasename(const STfsFile *pFile, char *dest);
|
||||||
*/
|
*/
|
||||||
void tfsDirname(const STfsFile *pFile, char *dest);
|
void tfsDirname(const STfsFile *pFile, char *dest);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Get the absolute file name of rname.
|
||||||
|
*
|
||||||
|
* @param pTfs
|
||||||
|
* @param diskId
|
||||||
|
* @param rname relative file name
|
||||||
|
* @param aname absolute file name
|
||||||
|
*/
|
||||||
|
void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Remove file in tfs.
|
* @brief Remove file in tfs.
|
||||||
*
|
*
|
||||||
|
|
|
@ -27,7 +27,7 @@ static struct {
|
||||||
} dmn = {0};
|
} dmn = {0};
|
||||||
|
|
||||||
static void dmnSigintHandle(int signum, void *info, void *ctx) {
|
static void dmnSigintHandle(int signum, void *info, void *ctx) {
|
||||||
uInfo("singal:%d is received", signum);
|
uInfo("signal:%d is received", signum);
|
||||||
dmn.stop = true;
|
dmn.stop = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,8 +20,7 @@ static const char *TSDB_SMA_DNAME[] = {
|
||||||
"tsma", // TSDB_SMA_TYPE_TIME_RANGE
|
"tsma", // TSDB_SMA_TYPE_TIME_RANGE
|
||||||
"rsma", // TSDB_SMA_TYPE_ROLLUP
|
"rsma", // TSDB_SMA_TYPE_ROLLUP
|
||||||
};
|
};
|
||||||
#define SMA_CHECK_HASH
|
#undef _TEST_SMA_PRINT_DEBUG_LOG_
|
||||||
#undef SMA_PRINT_DEBUG_LOG
|
|
||||||
#define SMA_STORAGE_TSDB_DAYS 30
|
#define SMA_STORAGE_TSDB_DAYS 30
|
||||||
#define SMA_STORAGE_TSDB_TIMES 10
|
#define SMA_STORAGE_TSDB_TIMES 10
|
||||||
#define SMA_STORAGE_SPLIT_HOURS 24
|
#define SMA_STORAGE_SPLIT_HOURS 24
|
||||||
|
@ -33,8 +32,8 @@ static const char *TSDB_SMA_DNAME[] = {
|
||||||
#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test
|
#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test
|
||||||
#define SMA_TEST_INDEX_UID 2000000001 // TODO: just for test
|
#define SMA_TEST_INDEX_UID 2000000001 // TODO: just for test
|
||||||
typedef enum {
|
typedef enum {
|
||||||
SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2t200.dat
|
SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f200.tsma
|
||||||
SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/rsma/sma_index_uid/v2r200.dat
|
SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f1906.tsma
|
||||||
} ESmaStorageLevel;
|
} ESmaStorageLevel;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -47,6 +46,7 @@ typedef struct {
|
||||||
int32_t iter;
|
int32_t iter;
|
||||||
int32_t fid;
|
int32_t fid;
|
||||||
} SmaFsIter;
|
} SmaFsIter;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STsdb * pTsdb;
|
STsdb * pTsdb;
|
||||||
SDBFile dFile;
|
SDBFile dFile;
|
||||||
|
@ -71,11 +71,12 @@ typedef struct {
|
||||||
} SSmaStatItem;
|
} SSmaStatItem;
|
||||||
|
|
||||||
struct SSmaStat {
|
struct SSmaStat {
|
||||||
SHashObj *smaStatItems; // key: indexName, value: SSmaStatItem
|
SHashObj *smaStatItems; // key: indexUid, value: SSmaStatItem
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
};
|
};
|
||||||
|
|
||||||
// declaration of static functions
|
// declaration of static functions
|
||||||
|
|
||||||
// expired window
|
// expired window
|
||||||
static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg);
|
static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg);
|
||||||
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
|
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
|
||||||
|
@ -159,22 +160,12 @@ static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbLockRepo(pTsdb) != 0) {
|
if (*pEnv == NULL) {
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (*pEnv == NULL) { // 2nd phase check
|
|
||||||
if ((*pEnv = tsdbNewSmaEnv(pTsdb, path)) == NULL) {
|
if ((*pEnv = tsdbNewSmaEnv(pTsdb, path)) == NULL) {
|
||||||
tsdbUnlockRepo(pTsdb);
|
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbUnlockRepo(pTsdb) != 0) {
|
|
||||||
*pEnv = tsdbFreeSmaEnv(*pEnv);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -279,6 +270,7 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
|
static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
|
||||||
|
// return if already init
|
||||||
switch (smaType) {
|
switch (smaType) {
|
||||||
case TSDB_SMA_TYPE_TIME_RANGE:
|
case TSDB_SMA_TYPE_TIME_RANGE:
|
||||||
if (pTsdb->pTSmaEnv) {
|
if (pTsdb->pTSmaEnv) {
|
||||||
|
@ -295,18 +287,39 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
// SDiskID did = {0};
|
// init sma env
|
||||||
SSmaEnv *pEnv = NULL;
|
tsdbLockRepo(pTsdb);
|
||||||
char smaPath[TSDB_FILENAME_LEN] = "/proj/.sma/";
|
if (pTsdb->pTSmaEnv == NULL) {
|
||||||
if (tsdbInitSmaEnv(pTsdb, smaPath, &pEnv) != TSDB_CODE_SUCCESS) {
|
char rname[TSDB_FILENAME_LEN] = {0};
|
||||||
return TSDB_CODE_FAILED;
|
char aname[TSDB_FILENAME_LEN * 2 + 32] = {0}; // TODO: make TMPNAME_LEN public as TSDB_FILENAME_LEN?
|
||||||
}
|
|
||||||
|
|
||||||
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
SDiskID did = {0};
|
||||||
pTsdb->pTSmaEnv = pEnv;
|
tfsAllocDisk(pTsdb->pTfs, TFS_PRIMARY_LEVEL, &did);
|
||||||
} else {
|
if (did.level < 0 || did.id < 0) {
|
||||||
pTsdb->pRSmaEnv = pEnv;
|
tsdbUnlockRepo(pTsdb);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
tsdbGetSmaDir(REPO_ID(pTsdb), smaType, rname);
|
||||||
|
tfsAbsoluteName(pTsdb->pTfs, did, rname, aname);
|
||||||
|
|
||||||
|
if (tfsMkdirRecurAt(pTsdb->pTfs, rname, did) != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbUnlockRepo(pTsdb);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSmaEnv *pEnv = NULL;
|
||||||
|
if (tsdbInitSmaEnv(pTsdb, aname, &pEnv) != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbUnlockRepo(pTsdb);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
||||||
|
pTsdb->pTSmaEnv = pEnv;
|
||||||
|
} else {
|
||||||
|
pTsdb->pRSmaEnv = pEnv;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
tsdbUnlockRepo(pTsdb);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
};
|
};
|
||||||
|
@ -379,7 +392,7 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) {
|
||||||
|
|
||||||
int8_t state = TSDB_SMA_STAT_EXPIRED;
|
int8_t state = TSDB_SMA_STAT_EXPIRED;
|
||||||
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
|
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
|
||||||
if (taosHashPut(pItem->expiredWindows, (void *)(expiredWindows + i), sizeof(TSKEY), &state, sizeof(state)) != 0) {
|
if (taosHashPut(pItem->expiredWindows, expiredWindows + i, sizeof(TSKEY), &state, sizeof(state)) != 0) {
|
||||||
// If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
|
// If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
|
||||||
// tell query module to query raw TS data.
|
// tell query module to query raw TS data.
|
||||||
// N.B.
|
// N.B.
|
||||||
|
@ -497,12 +510,12 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t k
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef SMA_PRINT_DEBUG_LOG
|
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
|
||||||
uint32_t valueSize = 0;
|
uint32_t valueSize = 0;
|
||||||
void * data = tsdbGetSmaDataByKey(pDBFile, smaKey, keyLen, &valueSize);
|
void * data = tsdbGetSmaDataByKey(pDBFile, smaKey, keyLen, &valueSize);
|
||||||
ASSERT(data != NULL);
|
ASSERT(data != NULL);
|
||||||
for (uint32_t v = 0; v < valueSize; v += 8) {
|
for (uint32_t v = 0; v < valueSize; v += 8) {
|
||||||
tsdbWarn("vgId:%d sma data - val[%d] is %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v));
|
tsdbWarn("vgId:%d insert sma data val[%d] %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v));
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -935,12 +948,14 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
|
||||||
tsdbCloseDBF(&tReadH.dFile);
|
tsdbCloseDBF(&tReadH.dFile);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
tfree(result);
|
|
||||||
#ifdef SMA_PRINT_DEBUG_LOG
|
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
|
||||||
for (uint32_t v = 0; v < valueSize; v += 8) {
|
for (uint32_t v = 0; v < valueSize; v += 8) {
|
||||||
tsdbWarn("vgId:%d v[%d]=%" PRIi64, REPO_ID(pTsdb), v, *(int64_t *)POINTER_SHIFT(result, v));
|
tsdbWarn("vgId:%d get sma data v[%d]=%" PRIi64, REPO_ID(pTsdb), v, *(int64_t *)POINTER_SHIFT(result, v));
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
tfree(result); // TODO: fill the result to output
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
int32_t nResult = 0;
|
int32_t nResult = 0;
|
||||||
int64_t lastKey = 0;
|
int64_t lastKey = 0;
|
||||||
|
|
|
@ -301,8 +301,14 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SDiskCfg pDisks = {.level = 0, .primary = 1};
|
||||||
|
strncpy(pDisks.dir, "/var/lib/taos", TSDB_FILENAME_LEN);
|
||||||
|
int32_t numOfDisks = 1;
|
||||||
|
tsdb.pTfs = tfsOpen(&pDisks, numOfDisks);
|
||||||
|
ASSERT_NE(tsdb.pTfs, nullptr);
|
||||||
|
|
||||||
char *msg = (char *)calloc(1, 100);
|
char *msg = (char *)calloc(1, 100);
|
||||||
assert(msg != NULL);
|
ASSERT_NE(msg, nullptr);
|
||||||
ASSERT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0);
|
ASSERT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0);
|
||||||
|
|
||||||
// init
|
// init
|
||||||
|
|
|
@ -202,6 +202,11 @@ void tfsDirname(const STfsFile *pFile, char *dest) {
|
||||||
tstrncpy(dest, taosDirName(tname), TSDB_FILENAME_LEN);
|
tstrncpy(dest, taosDirName(tname), TSDB_FILENAME_LEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname) {
|
||||||
|
STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId);
|
||||||
|
snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tfsRemoveFile(const STfsFile *pFile) { return taosRemoveFile(pFile->aname); }
|
int32_t tfsRemoveFile(const STfsFile *pFile) { return taosRemoveFile(pFile->aname); }
|
||||||
|
|
||||||
int32_t tfsCopyFile(const STfsFile *pFile1, const STfsFile *pFile2) {
|
int32_t tfsCopyFile(const STfsFile *pFile1, const STfsFile *pFile2) {
|
||||||
|
|
|
@ -378,7 +378,7 @@ class TdeSubProcess:
|
||||||
@classmethod
|
@classmethod
|
||||||
def _stopForSure(cls, proc: Popen, sig: int):
|
def _stopForSure(cls, proc: Popen, sig: int):
|
||||||
'''
|
'''
|
||||||
Stop a process and all sub processes with a singal, and SIGKILL if necessary
|
Stop a process and all sub processes with a signal, and SIGKILL if necessary
|
||||||
'''
|
'''
|
||||||
def doKillTdService(proc: Popen, sig: int):
|
def doKillTdService(proc: Popen, sig: int):
|
||||||
Logging.info("Killing sub-sub process {} with signal {}".format(proc.pid, sig))
|
Logging.info("Killing sub-sub process {} with signal {}".format(proc.pid, sig))
|
||||||
|
|
Loading…
Reference in New Issue