register rsma info
This commit is contained in:
parent
739b27700a
commit
d3857cbc55
|
@ -1483,8 +1483,8 @@ typedef struct {
|
|||
int32_t qmsg1Len;
|
||||
int32_t qmsg2Len;
|
||||
func_id_t* pFuncIds;
|
||||
char* qmsg1; // not null: pAst1:qmsg1:SRetention1 => trigger aggr task1
|
||||
char* qmsg2; // not null: pAst2:qmsg2:SRetention2 => trigger aggr task2
|
||||
char* qmsg1; // pAst1:qmsg1:SRetention1 => trigger aggr task1
|
||||
char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2
|
||||
int8_t nFuncIds;
|
||||
} SRSmaParam;
|
||||
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_VNODE_TSDB_SMA_H_
|
||||
#define _TD_VNODE_TSDB_SMA_H_
|
||||
|
||||
#include "os.h"
|
||||
#include "thash.h"
|
||||
#include "tmsg.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_VNODE_TSDB_SMA_H_*/
|
|
@ -113,6 +113,8 @@ void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
|
|||
|
||||
#include "tq.h"
|
||||
|
||||
#include "tsdbSma.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -33,6 +33,8 @@ static const char *TSDB_SMA_DNAME[] = {
|
|||
|
||||
#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test
|
||||
#define SMA_TEST_INDEX_UID 2000000001 // TODO: just for test
|
||||
|
||||
typedef struct SRSmaInfo SRSmaInfo;
|
||||
typedef enum {
|
||||
SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f200.tsma
|
||||
SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f1906.tsma
|
||||
|
@ -98,9 +100,38 @@ typedef struct {
|
|||
} SSmaStatItem;
|
||||
|
||||
struct SSmaStat {
|
||||
SHashObj *smaStatItems; // key: indexUid, value: SSmaStatItem
|
||||
union {
|
||||
SHashObj *smaStatItems; // key: indexUid, value: SSmaStatItem for tsma
|
||||
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
|
||||
};
|
||||
|
||||
T_REF_DECLARE()
|
||||
};
|
||||
#define SSMA_STAT_ITEMS(s) ((s)->smaStatItems)
|
||||
#define SSMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash)
|
||||
|
||||
#define RSMA_MAX_LEVEL 2
|
||||
#define RSMA_TASK_INFO_HASH_SLOT 8
|
||||
struct SRSmaInfo {
|
||||
void *taskInfo[RSMA_MAX_LEVEL]; // qTaskInfo_t
|
||||
};
|
||||
|
||||
static FORCE_INLINE void tsdbFreeTaskHandle(qTaskInfo_t *taskHandle) {
|
||||
// Note: free/kill may in RC
|
||||
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
|
||||
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
|
||||
qDestroyTask(otaskHandle);
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE void *tsdbFreeRSmaInfo(SRSmaInfo *pInfo) {
|
||||
for (int32_t i = 0; i < RSMA_MAX_LEVEL; ++i) {
|
||||
if (pInfo->taskInfo[i]) {
|
||||
tsdbFreeTaskHandle(pInfo->taskInfo[i]);
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// declaration of static functions
|
||||
|
||||
|
@ -108,11 +139,11 @@ struct SSmaStat {
|
|||
static int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version);
|
||||
static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey,
|
||||
int64_t version);
|
||||
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
|
||||
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat, int8_t smaType);
|
||||
static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem);
|
||||
static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
|
||||
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did);
|
||||
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv);
|
||||
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, int8_t smaType, const char *path, SDiskID did);
|
||||
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, int8_t smaType, const char *path, SDiskID did, SSmaEnv **pEnv);
|
||||
static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey);
|
||||
static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);
|
||||
static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);
|
||||
|
@ -317,7 +348,7 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
|
|||
snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TSDB_SMA_DNAME[smaType]);
|
||||
}
|
||||
|
||||
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did) {
|
||||
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, int8_t smaType, const char *path, SDiskID did) {
|
||||
SSmaEnv *pEnv = NULL;
|
||||
|
||||
pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv));
|
||||
|
@ -342,7 +373,7 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did)
|
|||
|
||||
pEnv->did = did;
|
||||
|
||||
if (tsdbInitSmaStat(&pEnv->pStat) != TSDB_CODE_SUCCESS) {
|
||||
if (tsdbInitSmaStat(&pEnv->pStat, smaType) != TSDB_CODE_SUCCESS) {
|
||||
tsdbFreeSmaEnv(pEnv);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -362,14 +393,14 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did)
|
|||
return pEnv;
|
||||
}
|
||||
|
||||
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv) {
|
||||
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, int8_t smaType, const char *path, SDiskID did, SSmaEnv **pEnv) {
|
||||
if (!pEnv) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (*pEnv == NULL) {
|
||||
if ((*pEnv = tsdbNewSmaEnv(pTsdb, path, did)) == NULL) {
|
||||
if ((*pEnv = tsdbNewSmaEnv(pTsdb, smaType, path, did)) == NULL) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
@ -416,7 +447,7 @@ static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
|
||||
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
|
||||
ASSERT(pSmaStat != NULL);
|
||||
|
||||
if (*pSmaStat != NULL) { // no lock
|
||||
|
@ -435,12 +466,24 @@ static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
(*pSmaStat)->smaStatItems =
|
||||
taosHashInit(SMA_STATE_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
if (smaType == TSDB_SMA_TYPE_ROLLUP) {
|
||||
SSMA_STAT_INFO_HASH(*pSmaStat) = taosHashInit(
|
||||
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||
|
||||
if ((*pSmaStat)->smaStatItems == NULL) {
|
||||
taosMemoryFreeClear(*pSmaStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
if (SSMA_STAT_INFO_HASH(*pSmaStat) == NULL) {
|
||||
taosMemoryFreeClear(*pSmaStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
||||
SSMA_STAT_ITEMS(*pSmaStat) =
|
||||
taosHashInit(SMA_STATE_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
|
||||
if (SSMA_STAT_ITEMS(*pSmaStat) == NULL) {
|
||||
taosMemoryFreeClear(*pSmaStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -531,7 +574,7 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (tsdbInitSmaEnv(pTsdb, rname, did, &pEnv) != TSDB_CODE_SUCCESS) {
|
||||
if (tsdbInitSmaEnv(pTsdb, smaType, rname, did, &pEnv) != TSDB_CODE_SUCCESS) {
|
||||
tsdbUnlockRepo(pTsdb);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
@ -1290,12 +1333,6 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
if (pEnv == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
tsdbWarn("vgId:%d insert rSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (pDataBlocks == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
tsdbWarn("vgId:%d insert rSma data failed since pDataBlocks is NULL", REPO_ID(pTsdb));
|
||||
|
@ -1633,6 +1670,87 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Only applicable to stable.
|
||||
*
|
||||
* @param pTsdb
|
||||
* @param pMeta
|
||||
* @param pReq
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
|
||||
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
|
||||
|
||||
if (param == NULL) {
|
||||
tsdbDebug("vgId:%d return directly since no rollup for stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name,
|
||||
pReq->stbCfg.suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) {
|
||||
tsdbWarn("vgId:%d no qmsg1/qmsg2 for rollup stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->stbCfg.suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) {
|
||||
terrno = TSDB_CODE_TDB_INIT_FAILED;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb);
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
||||
SRSmaInfo *pRSmaInfo = NULL;
|
||||
|
||||
TASSERT(pEnv != NULL && pStat != NULL);
|
||||
|
||||
pRSmaInfo = taosHashGet(pStat->rsmaInfoHash, &pReq->stbCfg.suid, sizeof(tb_uid_t));
|
||||
if (pRSmaInfo != NULL) {
|
||||
pRSmaInfo = tsdbFreeRSmaInfo(pRSmaInfo);
|
||||
}
|
||||
|
||||
STqReadHandle *pReadHandle = tqInitSubmitMsgScanner(pMeta);
|
||||
if (pReadHandle == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo));
|
||||
if (pRSmaInfo == NULL) {
|
||||
taosMemoryFree(pReadHandle);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SReadHandle handle = {
|
||||
.reader = pReadHandle,
|
||||
.meta = pMeta,
|
||||
};
|
||||
|
||||
if (param->qmsg1) {
|
||||
pRSmaInfo->taskInfo[0] = qCreateStreamExecTaskInfo(param->qmsg1, &handle);
|
||||
if (pRSmaInfo->taskInfo[0] == NULL) {
|
||||
taosMemoryFree(pRSmaInfo);
|
||||
taosMemoryFree(pReadHandle);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
if (param->qmsg2) {
|
||||
pRSmaInfo->taskInfo[1] = qCreateStreamExecTaskInfo(param->qmsg2, &handle);
|
||||
if (pRSmaInfo->taskInfo[1] == NULL) {
|
||||
taosMemoryFree(pRSmaInfo);
|
||||
taosMemoryFree(pReadHandle);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
if (taosHashPut(pStat->rsmaInfoHash, &pReq->stbCfg.suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) != 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#if 0
|
||||
/**
|
||||
* @brief Get the start TS key of the last data block of one interval/sliding.
|
||||
|
|
|
@ -215,14 +215,8 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// TODO: remove the debug log
|
||||
SRSmaParam *param = vCreateTbReq.stbCfg.pRSmaParam;
|
||||
if (param) {
|
||||
printf("qmsg1 len = %d, body = %s\n", param->qmsg1 ? (int32_t)strlen(param->qmsg1) : 0,
|
||||
param->qmsg1 ? param->qmsg1 : "");
|
||||
printf("qmsg1 len = %d, body = %s\n", param->qmsg2 ? (int32_t)strlen(param->qmsg2) : 0,
|
||||
param->qmsg2 ? param->qmsg2 : "");
|
||||
}
|
||||
// deploy Rollup SMA
|
||||
tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &vCreateTbReq);
|
||||
|
||||
taosMemoryFree(vCreateTbReq.stbCfg.pSchema);
|
||||
taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema);
|
||||
|
|
Loading…
Reference in New Issue