Merge pull request #12510 from taosdata/feature/TD-14481-3.0

feat: move sma module from tsdb to vnode
This commit is contained in:
Cary Xu 2022-05-15 22:28:02 +08:00 committed by GitHub
commit a3d1b0a50d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 3160 additions and 485 deletions

View File

@ -37,7 +37,8 @@ typedef enum {
TSDB_STREAM_TABLE = 4, // table created from stream computing
TSDB_TEMP_TABLE = 5, // temp table created by nest query
TSDB_SYSTEM_TABLE = 6,
TSDB_TABLE_MAX = 7
TSDB_TSMA_TABLE = 7, // time-range-wise sma
TSDB_TABLE_MAX = 8
} ETableType;
typedef enum {

View File

@ -2160,26 +2160,23 @@ int32_t tSerializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
int32_t tDeserializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
typedef struct {
int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
int8_t timezoneInt; // sma data expired if timezone changes.
char indexName[TSDB_INDEX_NAME_LEN];
int32_t exprLen;
int32_t tagsFilterLen;
int64_t indexUid;
tb_uid_t tableUid; // super/child/common table uid
int64_t interval;
int64_t offset; // use unit by precision of DB
int64_t sliding;
char* expr; // sma expression
char* tagsFilter;
int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
int8_t timezoneInt; // sma data expired if timezone changes.
char indexName[TSDB_INDEX_NAME_LEN];
int32_t exprLen;
int32_t tagsFilterLen;
int64_t indexUid;
tb_uid_t tableUid; // super/child/common table uid
int64_t interval;
int64_t offset; // use unit by precision of DB
int64_t sliding;
const char* expr; // sma expression
const char* tagsFilter;
} STSma; // Time-range-wise SMA
typedef struct {
int64_t ver; // use a general definition
STSma tSma;
} SVCreateTSmaReq;
typedef STSma SVCreateTSmaReq;
typedef struct {
int8_t type; // 0 status report, 1 update data
@ -2188,7 +2185,6 @@ typedef struct {
} STSmaMsg;
typedef struct {
int64_t ver; // use a general definition
int64_t indexUid;
char indexName[TSDB_INDEX_NAME_LEN];
} SVDropTSmaReq;
@ -2197,28 +2193,21 @@ typedef struct {
int tmp; // TODO: to avoid compile error
} SVCreateTSmaRsp, SVDropTSmaRsp;
#if 0
int32_t tSerializeSVCreateTSmaReq(void** buf, SVCreateTSmaReq* pReq);
void* tDeserializeSVCreateTSmaReq(void* buf, SVCreateTSmaReq* pReq);
int32_t tSerializeSVDropTSmaReq(void** buf, SVDropTSmaReq* pReq);
void* tDeserializeSVDropTSmaReq(void* buf, SVDropTSmaReq* pReq);
#endif
// RSma: Rollup SMA
typedef struct {
int64_t interval;
int32_t retention; // unit: day
uint16_t days; // unit: day
int8_t intervalUnit;
} SSmaParams;
int32_t tEncodeSVCreateTSmaReq(SEncoder* pCoder, const SVCreateTSmaReq* pReq);
int32_t tDecodeSVCreateTSmaReq(SDecoder* pCoder, SVCreateTSmaReq* pReq);
int32_t tEncodeSVDropTSmaReq(SEncoder* pCoder, const SVDropTSmaReq* pReq);
int32_t tDecodeSVDropTSmaReq(SDecoder* pCoder, SVDropTSmaReq* pReq);
typedef struct {
STSma tsma;
float xFilesFactor;
SArray* smaParams; // SSmaParams
} SRSma;
typedef struct {
uint32_t number;
STSma* tSma;
int32_t number;
STSma* tSma;
} STSmaWrapper;
static FORCE_INLINE void tdDestroyTSma(STSma* pSma) {
@ -2245,96 +2234,26 @@ static FORCE_INLINE void* tdFreeTSmaWrapper(STSmaWrapper* pSW) {
return NULL;
}
static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) {
int32_t tlen = 0;
int32_t tEncodeSVCreateTSmaReq(SEncoder* pCoder, const SVCreateTSmaReq* pReq);
int32_t tDecodeSVCreateTSmaReq(SDecoder* pCoder, SVCreateTSmaReq* pReq);
tlen += taosEncodeFixedI8(buf, pSma->version);
tlen += taosEncodeFixedI8(buf, pSma->intervalUnit);
tlen += taosEncodeFixedI8(buf, pSma->slidingUnit);
tlen += taosEncodeFixedI8(buf, pSma->timezoneInt);
tlen += taosEncodeString(buf, pSma->indexName);
tlen += taosEncodeFixedI32(buf, pSma->exprLen);
tlen += taosEncodeFixedI32(buf, pSma->tagsFilterLen);
tlen += taosEncodeFixedI64(buf, pSma->indexUid);
tlen += taosEncodeFixedI64(buf, pSma->tableUid);
tlen += taosEncodeFixedI64(buf, pSma->interval);
tlen += taosEncodeFixedI64(buf, pSma->offset);
tlen += taosEncodeFixedI64(buf, pSma->sliding);
int32_t tEncodeTSma(SEncoder* pCoder, const STSma* pSma);
int32_t tDecodeTSma(SDecoder* pCoder, STSma* pSma);
if (pSma->exprLen > 0) {
tlen += taosEncodeString(buf, pSma->expr);
static int32_t tEncodeTSmaWrapper(SEncoder* pEncoder, const STSmaWrapper* pReq) {
if (tEncodeI32(pEncoder, pReq->number) < 0) return -1;
for (int32_t i = 0; i < pReq->number; ++i) {
tEncodeTSma(pEncoder, pReq->tSma + i);
}
if (pSma->tagsFilterLen > 0) {
tlen += taosEncodeString(buf, pSma->tagsFilter);
}
return tlen;
return 0;
}
static FORCE_INLINE int32_t tEncodeTSmaWrapper(void** buf, const STSmaWrapper* pSW) {
int32_t tlen = 0;
tlen += taosEncodeFixedU32(buf, pSW->number);
for (uint32_t i = 0; i < pSW->number; ++i) {
tlen += tEncodeTSma(buf, pSW->tSma + i);
static int32_t tDecodeTSmaWrapper(SDecoder* pDecoder, STSmaWrapper* pReq) {
if (tDecodeI32(pDecoder, &pReq->number) < 0) return -1;
for (int32_t i = 0; i < pReq->number; ++i) {
tDecodeTSma(pDecoder, pReq->tSma + i);
}
return tlen;
}
static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) {
buf = taosDecodeFixedI8(buf, &pSma->version);
buf = taosDecodeFixedI8(buf, &pSma->intervalUnit);
buf = taosDecodeFixedI8(buf, &pSma->slidingUnit);
buf = taosDecodeFixedI8(buf, &pSma->timezoneInt);
buf = taosDecodeStringTo(buf, pSma->indexName);
buf = taosDecodeFixedI32(buf, &pSma->exprLen);
buf = taosDecodeFixedI32(buf, &pSma->tagsFilterLen);
buf = taosDecodeFixedI64(buf, &pSma->indexUid);
buf = taosDecodeFixedI64(buf, &pSma->tableUid);
buf = taosDecodeFixedI64(buf, &pSma->interval);
buf = taosDecodeFixedI64(buf, &pSma->offset);
buf = taosDecodeFixedI64(buf, &pSma->sliding);
if (pSma->exprLen > 0) {
if ((buf = taosDecodeString(buf, &pSma->expr)) == NULL) {
tdDestroyTSma(pSma);
return NULL;
}
} else {
pSma->expr = NULL;
}
if (pSma->tagsFilterLen > 0) {
if ((buf = taosDecodeString(buf, &pSma->tagsFilter)) == NULL) {
tdDestroyTSma(pSma);
return NULL;
}
} else {
pSma->tagsFilter = NULL;
}
return buf;
}
static FORCE_INLINE void* tDecodeTSmaWrapper(void* buf, STSmaWrapper* pSW) {
buf = taosDecodeFixedU32(buf, &pSW->number);
pSW->tSma = (STSma*)taosMemoryCalloc(pSW->number, sizeof(STSma));
if (pSW->tSma == NULL) {
return NULL;
}
for (uint32_t i = 0; i < pSW->number; ++i) {
if ((buf = tDecodeTSma(buf, pSW->tSma + i)) == NULL) {
for (uint32_t j = i; j >= 0; --i) {
tdDestroyTSma(pSW->tSma + j);
}
taosMemoryFree(pSW->tSma);
return NULL;
}
}
return buf;
return 0;
}
typedef struct {

View File

@ -354,7 +354,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x061A)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061B)
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x061C)
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x062D)
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x061D)
#define TSDB_CODE_TDB_TSMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x061E)
// query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)

View File

@ -61,6 +61,7 @@ extern int32_t tqDebugFlag;
extern int32_t fsDebugFlag;
extern int32_t metaDebugFlag;
extern int32_t fnDebugFlag;
extern int32_t smaDebugFlag;
int32_t taosInitLog(const char *logName, int32_t maxFiles);
void taosCloseLog();

View File

@ -299,6 +299,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "fnDebugFlag", fnDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "smaDebugFlag", smaDebugFlag, 0, 255, 0) != 0) return -1;
return 0;
}
@ -480,6 +481,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) {
tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32;
fsDebugFlag = cfgGetItem(pCfg, "fsDebugFlag")->i32;
fnDebugFlag = cfgGetItem(pCfg, "fnDebugFlag")->i32;
smaDebugFlag = cfgGetItem(pCfg, "smaDebugFlag")->i32;
}
static int32_t taosSetClientCfg(SConfig *pCfg) {

View File

@ -3562,39 +3562,92 @@ int tDecodeSVCreateTbBatchRsp(SDecoder *pCoder, SVCreateTbBatchRsp *pRsp) {
return 0;
}
int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->ver);
tlen += tEncodeTSma(buf, &pReq->tSma);
return tlen;
}
void *tDeserializeSVCreateTSmaReq(void *buf, SVCreateTSmaReq *pReq) {
buf = taosDecodeFixedI64(buf, &(pReq->ver));
if ((buf = tDecodeTSma(buf, &pReq->tSma)) == NULL) {
tdDestroyTSma(&pReq->tSma);
int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
if (tEncodeI8(pCoder, pSma->version) < 0) return -1;
if (tEncodeI8(pCoder, pSma->intervalUnit) < 0) return -1;
if (tEncodeI8(pCoder, pSma->slidingUnit) < 0) return -1;
if (tEncodeI8(pCoder, pSma->timezoneInt) < 0) return -1;
if (tEncodeCStr(pCoder, pSma->indexName) < 0) return -1;
if (tEncodeI32(pCoder, pSma->exprLen) < 0) return -1;
if (tEncodeI32(pCoder, pSma->tagsFilterLen) < 0) return -1;
if (tEncodeI64(pCoder, pSma->indexUid) < 0) return -1;
if (tEncodeI64(pCoder, pSma->tableUid) < 0) return -1;
if (tEncodeI64(pCoder, pSma->interval) < 0) return -1;
if (tEncodeI64(pCoder, pSma->offset) < 0) return -1;
if (tEncodeI64(pCoder, pSma->sliding) < 0) return -1;
if (pSma->exprLen > 0) {
if (tEncodeCStr(pCoder, pSma->expr) < 0) return -1;
}
return buf;
if (pSma->tagsFilterLen > 0) {
if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1;
}
return 0;
}
int32_t tSerializeSVDropTSmaReq(void **buf, SVDropTSmaReq *pReq) {
int32_t tlen = 0;
int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
if (tDecodeI8(pCoder, &pSma->version) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->intervalUnit) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->slidingUnit) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->timezoneInt) < 0) return -1;
if (tDecodeCStrTo(pCoder, pSma->indexName) < 0) return -1;
if (tDecodeI32(pCoder, &pSma->exprLen) < 0) return -1;
if (tDecodeI32(pCoder, &pSma->tagsFilterLen) < 0) return -1;
if (tDecodeI64(pCoder, &pSma->indexUid) < 0) return -1;
if (tDecodeI64(pCoder, &pSma->tableUid) < 0) return -1;
if (tDecodeI64(pCoder, &pSma->interval) < 0) return -1;
if (tDecodeI64(pCoder, &pSma->offset) < 0) return -1;
if (tDecodeI64(pCoder, &pSma->sliding) < 0) return -1;
if (pSma->exprLen > 0) {
if (tDecodeCStr(pCoder, &pSma->expr) < 0) return -1;
} else {
pSma->expr = NULL;
}
if (pSma->tagsFilterLen > 0) {
if (tDecodeCStr(pCoder, &pSma->tagsFilter) < 0) return -1;
} else {
pSma->tagsFilter = NULL;
}
tlen += taosEncodeFixedI64(buf, pReq->ver);
tlen += taosEncodeFixedI64(buf, pReq->indexUid);
tlen += taosEncodeString(buf, pReq->indexName);
return tlen;
return 0;
}
void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) {
buf = taosDecodeFixedI64(buf, &(pReq->ver));
buf = taosDecodeFixedI64(buf, &(pReq->indexUid));
buf = taosDecodeStringTo(buf, pReq->indexName);
return buf;
int32_t tEncodeSVCreateTSmaReq(SEncoder *pCoder, const SVCreateTSmaReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
tEncodeTSma(pCoder, pReq);
tEndEncode(pCoder);
return 0;
}
int32_t tDecodeSVCreateTSmaReq(SDecoder *pCoder, SVCreateTSmaReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
tDecodeTSma(pCoder, pReq);
tEndDecode(pCoder);
return 0;
}
int32_t tEncodeSVDropTSmaReq(SEncoder *pCoder, const SVDropTSmaReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->indexName) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
int32_t tDecodeSVDropTSmaReq(SDecoder *pCoder, SVDropTSmaReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
if (tDecodeCStrTo(pCoder, pReq->indexName) < 0) return -1;
tEndDecode(pCoder);
return 0;
}
int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) {

View File

@ -242,26 +242,35 @@ SDbObj *mndAcquireDbBySma(SMnode *pMnode, const char *smaName) {
}
static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
SName name = {0};
SEncoder encoder = {0};
int32_t contLen = 0;
SName name = {0};
tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
SVCreateTSmaReq req = {0};
req.tSma.version = 0;
req.tSma.intervalUnit = pSma->intervalUnit;
req.tSma.slidingUnit = pSma->slidingUnit;
req.tSma.timezoneInt = pSma->timezone;
tstrncpy(req.tSma.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN);
req.tSma.exprLen = pSma->exprLen;
req.tSma.tagsFilterLen = pSma->tagsFilterLen;
req.tSma.indexUid = pSma->uid;
req.tSma.tableUid = pSma->stbUid;
req.tSma.interval = pSma->interval;
req.tSma.offset = pSma->offset;
req.tSma.sliding = pSma->sliding;
req.tSma.expr = pSma->expr;
req.tSma.tagsFilter = pSma->tagsFilter;
req.version = 0;
req.intervalUnit = pSma->intervalUnit;
req.slidingUnit = pSma->slidingUnit;
req.timezoneInt = pSma->timezone;
tstrncpy(req.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN);
req.exprLen = pSma->exprLen;
req.tagsFilterLen = pSma->tagsFilterLen;
req.indexUid = pSma->uid;
req.tableUid = pSma->stbUid;
req.interval = pSma->interval;
req.offset = pSma->offset;
req.sliding = pSma->sliding;
req.expr = pSma->expr;
req.tagsFilter = pSma->tagsFilter;
// get length
int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTSmaReq, &req, contLen, ret);
if (ret < 0) {
return NULL;
}
contLen += sizeof(SMsgHead);
int32_t contLen = tSerializeSVCreateTSmaReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead *pHead = taosMemoryMalloc(contLen);
if (pHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -272,22 +281,38 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm
pHead->vgId = htonl(pVgroup->vgId);
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tSerializeSVCreateTSmaReq(&pBuf, &req);
tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
if (tEncodeSVCreateTSmaReq(&encoder, &req) < 0) {
taosMemoryFreeClear(pHead);
tEncoderClear(&encoder);
return NULL;
}
tEncoderClear(&encoder);
*pContLen = contLen;
return pHead;
}
static void *mndBuildVDropSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
SEncoder encoder = {0};
int32_t contLen;
SName name = {0};
tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
SVDropTSmaReq req = {0};
req.ver = 0;
req.indexUid = pSma->uid;
tstrncpy(req.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN);
int32_t contLen = tSerializeSVDropTSmaReq(NULL, &req) + sizeof(SMsgHead);
// get length
int32_t ret = 0;
tEncodeSize(tEncodeSVDropTSmaReq, &req, contLen, ret);
if (ret < 0) {
return NULL;
}
contLen += sizeof(SMsgHead);
SMsgHead *pHead = taosMemoryMalloc(contLen);
if (pHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -298,7 +323,14 @@ static void *mndBuildVDropSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma,
pHead->vgId = htonl(pVgroup->vgId);
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tDeserializeSVDropTSmaReq(&pBuf, &req);
tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
if (tEncodeSVDropTSmaReq(&encoder, &req) < 0) {
taosMemoryFreeClear(pHead);
tEncoderClear(&encoder);
return NULL;
}
tEncoderClear(&encoder);
*pContLen = contLen;
return pHead;

View File

@ -425,6 +425,10 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
if (tEncodeSVCreateStbReq(&encoder, &req) < 0) {
taosMemoryFreeClear(pHead);
taosMemoryFreeClear(req.pRSmaParam.qmsg1);
taosMemoryFreeClear(req.pRSmaParam.qmsg2);
tEncoderClear(&encoder);
return NULL;
}
tEncoderClear(&encoder);

View File

@ -18,12 +18,21 @@ target_sources(
"src/meta/metaOpen.c"
"src/meta/metaIdx.c"
"src/meta/metaTable.c"
"src/meta/metaSma.c"
"src/meta/metaQuery.c"
"src/meta/metaCommit.c"
"src/meta/metaEntry.c"
# sma
"src/sma/sma.c"
"src/sma/smaTDBImpl.c"
"src/sma/smaEnv.c"
"src/sma/smaOpen.c"
"src/sma/smaRollup.c"
"src/sma/smaTimeRange.c"
# tsdb
"src/tsdb/tsdbTDBImpl.c"
# "src/tsdb/tsdbTDBImpl.c"
"src/tsdb/tsdbCommit.c"
"src/tsdb/tsdbCommit2.c"
"src/tsdb/tsdbFile.c"
@ -33,7 +42,7 @@ target_sources(
"src/tsdb/tsdbMemTable2.c"
"src/tsdb/tsdbRead.c"
"src/tsdb/tsdbReadImpl.c"
"src/tsdb/tsdbSma.c"
# "src/tsdb/tsdbSma.c"
"src/tsdb/tsdbWrite.c"
# tq

View File

@ -191,6 +191,9 @@ struct SMetaEntry {
int32_t ttlDays;
SSchemaWrapper schema;
} ntbEntry;
struct {
STSma *tsma;
} smaEntry;
};
};

View File

@ -73,6 +73,7 @@ struct SMeta {
TDB* pCtbIdx;
TDB* pTagIdx;
TDB* pTtlIdx;
TDB* pSmaIdx;
SMetaIdx* pIdx;
};
@ -108,6 +109,11 @@ typedef struct {
tb_uid_t uid;
} STtlIdxKey;
typedef struct {
tb_uid_t uid;
int64_t smaUid;
} SSmaIdxKey;
#if 1
SMSmaCursor* metaOpenSmaCursor(SMeta* pMeta, tb_uid_t uid);
@ -118,7 +124,7 @@ int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur);
// SMetaDB
int metaOpenDB(SMeta* pMeta);
void metaCloseDB(SMeta* pMeta);
// int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);

View File

@ -0,0 +1,225 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_SMA_H_
#define _TD_VNODE_SMA_H_
#include "vnodeInt.h"
#ifdef __cplusplus
extern "C" {
#endif
// smaDebug ================
// clang-format off
#define smaFatal(...) do { if (smaDebugFlag & DEBUG_FATAL) { taosPrintLog("SMA FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define smaError(...) do { if (smaDebugFlag & DEBUG_ERROR) { taosPrintLog("SMA ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define smaWarn(...) do { if (smaDebugFlag & DEBUG_WARN) { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define smaInfo(...) do { if (smaDebugFlag & DEBUG_INFO) { taosPrintLog("SMA ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define smaDebug(...) do { if (smaDebugFlag & DEBUG_DEBUG) { taosPrintLog("SMA ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
typedef struct SSmaEnv SSmaEnv;
typedef struct SSmaStat SSmaStat;
typedef struct SSmaStatItem SSmaStatItem;
typedef struct SSmaKey SSmaKey;
typedef struct SRSmaInfo SRSmaInfo;
#define SMA_IVLD_FID INT_MIN
struct SSmaEnv {
TdThreadRwlock lock;
int8_t type;
TXN txn;
void *pPool; // SPoolMem
SDiskID did;
TENV *dbEnv; // TODO: If it's better to put it in smaIndex level?
char *path; // relative path
SSmaStat *pStat;
};
#define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_TYPE(env) ((env)->type)
#define SMA_ENV_DID(env) ((env)->did)
#define SMA_ENV_ENV(env) ((env)->dbEnv)
#define SMA_ENV_PATH(env) ((env)->path)
#define SMA_ENV_STAT(env) ((env)->pStat)
#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
struct SSmaStatItem {
/**
* @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service.
* - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from
* Streaming Module or TSDB local persistence.
* - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
* without information about its previous state.
* - TSDB_SMA_STAT_DROPPED: 1)sma dropped
* N.B. only applicable to tsma
*/
int8_t state; // ETsdbSmaStat
SHashObj *expiredWindows; // key: skey of time window, value: N/A
STSma *pTSma; // cache schema
};
struct SSmaStat {
union {
SHashObj *smaStatItems; // key: indexUid, value: SSmaStatItem for tsma
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
};
T_REF_DECLARE()
};
#define SMA_STAT_ITEMS(s) ((s)->smaStatItems)
#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash)
struct SSmaKey {
TSKEY skey;
int64_t groupId;
};
typedef struct SDBFile SDBFile;
struct SDBFile {
int32_t fid;
TDB *pDB;
char *path;
};
int32_t tdSmaBeginCommit(SSmaEnv *pEnv);
int32_t tdSmaEndCommit(SSmaEnv *pEnv);
int32_t smaOpenDBEnv(TENV **ppEnv, const char *path);
int32_t smaCloseDBEnv(TENV *pEnv);
int32_t smaOpenDBF(TENV *pEnv, SDBFile *pDBF);
int32_t smaCloseDBF(SDBFile *pDBF);
int32_t smaSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn);
void *smaGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_t *valLen);
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
#if 0
int32_t tbGetTSmaStatus(SSma *pSma, STSma *param, void *result);
int32_t tbRemoveTSmaData(SSma *pSma, STSma *param, STimeWindow *pWin);
#endif
static FORCE_INLINE int32_t tdEncodeTSmaKey(int64_t groupId, TSKEY tsKey, void **pData) {
int32_t len = 0;
len += taosEncodeFixedI64(pData, tsKey);
len += taosEncodeFixedI64(pData, groupId);
return len;
}
int32_t tdInitSma(SSma *pSma);
int32_t tdDropTSma(SSma *pSma, char *pMsg);
int32_t tdDropTSmaData(SSma *pSma, int64_t indexUid);
int32_t tdInsertRSmaData(SSma *pSma, char *msg);
int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
int32_t tdLockSma(SSma *pSma);
int32_t tdUnLockSma(SSma *pSma);
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
static FORCE_INLINE int16_t tdTSmaAdd(SSma *pSma, int16_t n) { return atomic_add_fetch_16(&SMA_TSMA_NUM(pSma), n); }
static FORCE_INLINE int16_t tdTSmaSub(SSma *pSma, int16_t n) { return atomic_sub_fetch_16(&SMA_TSMA_NUM(pSma), n); }
static FORCE_INLINE int32_t tdRLockSmaEnv(SSmaEnv *pEnv) {
int code = taosThreadRwlockRdlock(&(pEnv->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static FORCE_INLINE int32_t tdWLockSmaEnv(SSmaEnv *pEnv) {
int code = taosThreadRwlockWrlock(&(pEnv->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static FORCE_INLINE int32_t tdUnLockSmaEnv(SSmaEnv *pEnv) {
int code = taosThreadRwlockUnlock(&(pEnv->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static FORCE_INLINE int8_t tdSmaStat(SSmaStatItem *pStatItem) {
if (pStatItem) {
return atomic_load_8(&pStatItem->state);
}
return TSDB_SMA_STAT_UNKNOWN;
}
static FORCE_INLINE bool tdSmaStatIsOK(SSmaStatItem *pStatItem, int8_t *state) {
if (!pStatItem) {
return false;
}
if (state) {
*state = atomic_load_8(&pStatItem->state);
return *state == TSDB_SMA_STAT_OK;
}
return atomic_load_8(&pStatItem->state) == TSDB_SMA_STAT_OK;
}
static FORCE_INLINE bool tdSmaStatIsExpired(SSmaStatItem *pStatItem) {
return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_EXPIRED) : true;
}
static FORCE_INLINE bool tdSmaStatIsDropped(SSmaStatItem *pStatItem) {
return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_DROPPED) : true;
}
static FORCE_INLINE void tdSmaStatSetOK(SSmaStatItem *pStatItem) {
if (pStatItem) {
atomic_store_8(&pStatItem->state, TSDB_SMA_STAT_OK);
}
}
static FORCE_INLINE void tdSmaStatSetExpired(SSmaStatItem *pStatItem) {
if (pStatItem) {
atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_EXPIRED);
}
}
static FORCE_INLINE void tdSmaStatSetDropped(SSmaStatItem *pStatItem) {
if (pStatItem) {
atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_DROPPED);
}
}
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType);
void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem);
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path, SDiskID did);
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SDiskID did, SSmaEnv **pEnv);
void *tdFreeRSmaInfo(SRSmaInfo *pInfo);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_SMA_H_*/

View File

@ -60,31 +60,22 @@ typedef struct {
TSKEY minKey;
} SRtn;
struct SSmaEnvs {
int16_t nTSma;
int16_t nRSma;
SSmaEnv *pTSmaEnv;
SSmaEnv *pRSmaEnv;
};
#define TSDB_DATA_DIR_LEN 6
struct STsdb {
char *path;
SVnode *pVnode;
TdThreadMutex mutex;
char dir[TSDB_DATA_DIR_LEN];
bool repoLocked;
int8_t level; // retention level
STsdbKeepCfg keepCfg;
STsdbMemTable *mem;
STsdbMemTable *imem;
SRtn rtn;
STsdbFS *fs;
SSmaEnvs smaEnvs;
};
#if 1 // ======================================
typedef struct SSmaStat SSmaStat;
struct STable {
uint64_t tid;
uint64_t uid;
@ -95,10 +86,6 @@ struct STable {
#define TABLE_UID(t) (t)->uid
int tsdbPrepareCommit(STsdb *pTsdb);
int32_t tsdbInitSma(STsdb *pTsdb);
int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg);
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
typedef enum {
TSDB_FILE_HEAD = 0, // .head
TSDB_FILE_DATA, // .data
@ -107,8 +94,6 @@ typedef enum {
TSDB_FILE_SMAL, // .smal(Block-wise SMA)
TSDB_FILE_MAX, //
TSDB_FILE_META, // meta
TSDB_FILE_TSMA, // v2t100.${sma_index_name}, Time-range-wise SMA
TSDB_FILE_RSMA, // v2r100.${sma_index_name}, Time-range-wise Rollup SMA
} E_TSDB_FILE_T;
typedef struct {
@ -186,15 +171,10 @@ struct STsdbFS {
#define REPO_ID(r) TD_VID((r)->pVnode)
#define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg)
#define REPO_KEEP_CFG(r) (&(r)->keepCfg)
#define REPO_LEVEL(r) ((r)->level)
#define REPO_FS(r) ((r)->fs)
#define REPO_META(r) ((r)->pVnode->pMeta)
#define REPO_TFS(r) ((r)->pVnode->pTfs)
#define IS_REPO_LOCKED(r) ((r)->repoLocked)
#define REPO_TSMA_NUM(r) ((r)->smaEnvs.nTSma)
#define REPO_RSMA_NUM(r) ((r)->smaEnvs.nRSma)
#define REPO_TSMA_ENV(r) ((r)->smaEnvs.pTSmaEnv)
#define REPO_RSMA_ENV(r) ((r)->smaEnvs.pRSmaEnv)
int tsdbLockRepo(STsdb *pTsdb);
int tsdbUnlockRepo(STsdb *pTsdb);
@ -794,25 +774,6 @@ typedef struct {
} SFSHeader;
// ================== TSDB File System Meta
/**
* @brief Directory structure of .tsma data files.
*
* /vnode2/tsdb $ tree tsma/
* tsma/
* v2f100.index_name_1
* v2f101.index_name_1
* v2f102.index_name_1
* v2f1900.index_name_3
* v2f1901.index_name_3
* v2f1902.index_name_3
* v2f200.index_name_2
* v2f201.index_name_2
* v2f202.index_name_2
*
* 0 directories, 9 files
*/
#define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus)
#define FS_NEW_STATUS(pfs) ((pfs)->nstatus)
#define FS_IN_TXN(pfs) (pfs)->intxn
@ -874,43 +835,6 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
return 0;
}
typedef struct SSmaKey SSmaKey;
struct SSmaKey {
TSKEY skey;
int64_t groupId;
};
typedef struct SDBFile SDBFile;
struct SDBFile {
int32_t fid;
TDB *pDB;
char *path;
};
int32_t tsdbOpenDBEnv(TENV **ppEnv, const char *path);
int32_t tsdbCloseDBEnv(TENV *pEnv);
int32_t tsdbOpenDBF(TENV *pEnv, SDBFile *pDBF);
int32_t tsdbCloseDBF(SDBFile *pDBF);
int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn);
void *tsdbGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_t *valLen);
void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv);
#if 0
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result);
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin);
#endif
// internal func
static FORCE_INLINE int32_t tsdbEncodeTSmaKey(int64_t groupId, TSKEY tsKey, void **pData) {
int32_t len = 0;
len += taosEncodeFixedI64(pData, tsKey);
len += taosEncodeFixedI64(pData, groupId);
return len;
}
#endif
#ifdef __cplusplus

View File

@ -47,13 +47,15 @@
extern "C" {
#endif
typedef struct SVnodeInfo SVnodeInfo;
typedef struct SMeta SMeta;
typedef struct STsdb STsdb;
typedef struct STQ STQ;
typedef struct SVState SVState;
typedef struct SVBufPool SVBufPool;
typedef struct SQWorker SQHandle;
typedef struct SVnodeInfo SVnodeInfo;
typedef struct SMeta SMeta;
typedef struct SSma SSma;
typedef struct STsdb STsdb;
typedef struct STQ STQ;
typedef struct SVState SVState;
typedef struct SVBufPool SVBufPool;
typedef struct SQWorker SQHandle;
typedef struct STsdbKeepCfg STsdbKeepCfg;
#define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb"
@ -90,17 +92,14 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
SArray* metaGetSmaTbUids(SMeta* pMeta, bool isDup);
void* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid, bool isDecode);
STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid);
int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
// tsdb
int tsdbOpen(SVnode* pVnode, int8_t type);
int tsdbClose(STsdb* pTsdb);
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg *pKeepCfg);
int tsdbClose(STsdb** pTsdb);
int tsdbBegin(STsdb* pTsdb);
int tsdbCommit(STsdb* pTsdb);
int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version);
int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg);
int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
@ -121,13 +120,31 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t wo
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
// sma
int32_t smaOpen(SVnode* pVnode);
int32_t smaClose(SSma* pSma);
int32_t tdUpdateExpireWindow(SSma* pSma, SSubmitReq* pMsg, int64_t version);
int32_t tdProcessTSmaCreate(SSma* pSma, char* pMsg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
int32_t tdProcessRSmaCreate(SSma* pSma, SMeta* pMeta, SVCreateStbReq* pReq, SMsgCb* pMsgCb);
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore);
void tdUidStoreDestory(STbUidStore* pStore);
void* tdUidStoreFree(STbUidStore* pStore);
#if 0
int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version);
int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg);
int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg);
int32_t tsdbRegisterRSma(STsdb* pTsdb, SMeta* pMeta, SVCreateStbReq* pReq, SMsgCb* pMsgCb);
int32_t tsdbFetchTbUidList(STsdb* pTsdb, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore);
void tsdbUidStoreDestory(STbUidStore* pStore);
void* tsdbUidStoreFree(STbUidStore* pStore);
int32_t tsdbTriggerRSma(STsdb* pTsdb, void* pMsg, int32_t inputType);
#endif
typedef struct {
int8_t streamType; // sma or other
@ -164,13 +181,13 @@ typedef enum {
TSDB_TYPE_RSMA_L2 = 4, // RSMA Level 2
} ETsdbType;
typedef struct {
struct STsdbKeepCfg{
int8_t precision; // precision always be used with below keep cfgs
int32_t days;
int32_t keep0;
int32_t keep1;
int32_t keep2;
} STsdbKeepCfg;
};
struct SVnode {
char* path;
@ -183,9 +200,8 @@ struct SVnode {
SVBufPool* onCommit;
SVBufPool* onRecycle;
SMeta* pMeta;
SSma* pSma;
STsdb* pTsdb;
STsdb* pRSma1;
STsdb* pRSma2;
SWal* pWal;
STQ* pTq;
SSink* pSink;
@ -194,10 +210,12 @@ struct SVnode {
SQHandle* pQuery;
};
#define TD_VID(PVNODE) (PVNODE)->config.vgId
#define VND_TSDB(vnd) ((vnd)->pTsdb)
#define VND_RSMA0(vnd) ((vnd)->pTsdb)
#define VND_RSMA1(vnd) ((vnd)->pRSma1)
#define VND_RSMA2(vnd) ((vnd)->pRSma2)
#define VND_RSMA1(vnd) ((vnd)->pSma->pRSmaTsdb1)
#define VND_RSMA2(vnd) ((vnd)->pSma->pRSmaTsdb2)
#define VND_RETENTIONS(vnd) (&(vnd)->config.tsdbCfg.retentions)
struct STbUidStore {
@ -207,7 +225,29 @@ struct STbUidStore {
SHashObj* uidHash;
};
#define TD_VID(PVNODE) (PVNODE)->config.vgId
struct SSma {
int16_t nTSma;
bool locked;
TdThreadMutex mutex;
SVnode* pVnode;
STsdb* pRSmaTsdb1;
STsdb* pRSmaTsdb2;
void* pTSmaEnv;
void* pRSmaEnv;
};
#define SMA_CFG(s) (&(s)->pVnode->config)
#define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg)
#define SMA_LOCKED(s) ((s)->locked)
#define SMA_META(s) ((s)->pVnode->pMeta)
#define SMA_VID(s) TD_VID((s)->pVnode)
#define SMA_TFS(s) ((s)->pVnode->pTfs)
#define SMA_TSMA_NUM(s) ((s)->nTSma)
#define SMA_TSMA_ENV(s) ((s)->pTSmaEnv)
#define SMA_RSMA_ENV(s) ((s)->pRSmaEnv)
#define SMA_RSMA_TSDB0(s) ((s)->pVnode->pTsdb)
#define SMA_RSMA_TSDB1(s) ((s)->pRSmaTsdb1)
#define SMA_RSMA_TSDB2(s) ((s)->pRSmaTsdb2)
static FORCE_INLINE bool vnodeIsRollup(SVnode* pVnode) {
SRetention* pRetention = &(pVnode->config.tsdbCfg.retentions[0]);

View File

@ -35,6 +35,8 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1;
} else if (pME->type == TSDB_TSMA_TABLE) {
if (tEncodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1;
} else {
ASSERT(0);
}
@ -64,7 +66,9 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1;
} else {
} else if (pME->type == TSDB_TSMA_TABLE) {
if (tDecodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1;
} else {
ASSERT(0);
}

View File

@ -112,35 +112,4 @@ int metaRemoveTableFromIdx(SMeta *pMeta, tb_uid_t uid) {
#endif
// TODO
return 0;
}
int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg) {
// TODO: Validate the cfg
// The table uid should exists and be super table or common table.
// Check other cfg value
// TODO: add atomicity
#ifdef META_REFACT
#else
if (metaSaveSmaToDB(pMeta, &pCfg->tSma) < 0) {
// TODO: handle error
return -1;
}
#endif
return TSDB_CODE_SUCCESS;
}
int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid) {
// TODO: Validate the cfg
// TODO: add atomicity
#ifdef META_REFACT
#else
if (metaRemoveSmaFromDb(pMeta, indexUid) < 0) {
// TODO: handle error
return -1;
}
#endif
return TSDB_CODE_SUCCESS;
}

View File

@ -21,6 +21,7 @@ static int ctbIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); }
@ -104,6 +105,13 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
goto _err;
}
// open pSmaIdx
ret = tdbDbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta sma index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open index
if (metaOpenIdx(pMeta) < 0) {
metaError("vgId:%d failed to open meta index since %s", TD_VID(pVnode), tstrerror(terrno));
@ -117,11 +125,12 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
_err:
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pSmaIdx) tdbDbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbDbClose(pMeta->pTtlIdx);
if (pMeta->pTagIdx) tdbDbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbDbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbDbClose(pMeta->pNameIdx);
if (pMeta->pNameIdx) tdbDbClose(pMeta->pUidIdx);
if (pMeta->pUidIdx) tdbDbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbDbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbDbClose(pMeta->pTbDb);
if (pMeta->pEnv) tdbEnvClose(pMeta->pEnv);
@ -133,11 +142,12 @@ _err:
int metaClose(SMeta *pMeta) {
if (pMeta) {
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pSmaIdx) tdbDbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbDbClose(pMeta->pTtlIdx);
if (pMeta->pTagIdx) tdbDbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbDbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbDbClose(pMeta->pNameIdx);
if (pMeta->pNameIdx) tdbDbClose(pMeta->pUidIdx);
if (pMeta->pUidIdx) tdbDbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbDbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbDbClose(pMeta->pTbDb);
if (pMeta->pEnv) tdbEnvClose(pMeta->pEnv);
@ -295,3 +305,22 @@ static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
return 0;
}
static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
SSmaIdxKey *pSmaIdxKey1 = (SSmaIdxKey *)pKey1;
SSmaIdxKey *pSmaIdxKey2 = (SSmaIdxKey *)pKey2;
if (pSmaIdxKey1->uid > pSmaIdxKey2->uid) {
return 1;
} else if (pSmaIdxKey1->uid < pSmaIdxKey2->uid) {
return -1;
}
if (pSmaIdxKey1->smaUid > pSmaIdxKey2->smaUid) {
return 1;
} else if (pSmaIdxKey1->smaUid < pSmaIdxKey2->smaUid) {
return -1;
}
return 0;
}

View File

@ -0,0 +1,208 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME);
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME);
int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
// TODO: Validate the cfg
// The table uid should exists and be super table or normal table.
// Check other cfg value
SMetaEntry me = {0};
int kLen = 0;
int vLen = 0;
const void *pKey = NULL;
const void *pVal = NULL;
void *pBuf = NULL;
int32_t szBuf = 0;
void *p = NULL;
SMetaReader mr = {0};
// validate req
metaReaderInit(&mr, pMeta, 0);
if (metaGetTableEntryByUid(&mr, pCfg->indexUid) == 0) {
// TODO: just for pass case
#if 1
terrno = TSDB_CODE_TDB_TSMA_ALREADY_EXIST;
metaReaderClear(&mr);
return -1;
#else
metaReaderClear(&mr);
return 0;
#endif
}
metaReaderClear(&mr);
// set structs
me.version = version;
me.type = TSDB_TSMA_TABLE;
me.uid = pCfg->indexUid;
me.name = pCfg->indexName;
me.smaEntry.tsma = pCfg;
if (metaHandleSmaEntry(pMeta, &me) < 0) goto _err;
metaDebug("vgId:%d tsma is created, name:%s uid: %" PRId64, TD_VID(pMeta->pVnode), pCfg->indexName, pCfg->indexUid);
return 0;
_err:
metaError("vgId:%d failed to create tsma: %s uid: %" PRId64 " since %s", TD_VID(pMeta->pVnode), pCfg->indexName,
pCfg->indexUid, tstrerror(terrno));
return -1;
}
int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid) {
// TODO: Validate the cfg
// TODO: add atomicity
#ifdef META_REFACT
#else
if (metaRemoveSmaFromDb(pMeta, indexUid) < 0) {
// TODO: handle error
return -1;
}
#endif
return TSDB_CODE_SUCCESS;
}
// static int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
// int32_t ret = 0;
// void *pBuf = NULL, *qBuf = NULL;
// void *key = {0}, *val = {0};
// // save sma info
// int32_t len = tEncodeTSma(NULL, pSmaCfg);
// pBuf = taosMemoryCalloc(1, len);
// if (pBuf == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
// return -1;
// }
// key = (void *)&pSmaCfg->indexUid;
// qBuf = pBuf;
// tEncodeTSma(&qBuf, pSmaCfg);
// val = pBuf;
// int32_t kLen = sizeof(pSmaCfg->indexUid);
// int32_t vLen = POINTER_DISTANCE(qBuf, pBuf);
// ret = tdbDbInsert(pMeta->pTbDb, key, kLen, val, vLen, &pMeta->txn);
// if (ret < 0) {
// taosMemoryFreeClear(pBuf);
// return -1;
// }
// // add sma idx
// SSmaIdxKey smaIdxKey;
// smaIdxKey.uid = pSmaCfg->tableUid;
// smaIdxKey.smaUid = pSmaCfg->indexUid;
// key = &smaIdxKey;
// kLen = sizeof(smaIdxKey);
// val = NULL;
// vLen = 0;
// ret = tdbDbInsert(pMeta->pSmaIdx, key, kLen, val, vLen, &pMeta->txn);
// if (ret < 0) {
// taosMemoryFreeClear(pBuf);
// return -1;
// }
// // release
// taosMemoryFreeClear(pBuf);
// return 0;
// }
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) {
STbDbKey tbDbKey;
void *pKey = NULL;
void *pVal = NULL;
int kLen = 0;
int vLen = 0;
SEncoder coder = {0};
// set key and value
tbDbKey.version = pME->version;
tbDbKey.uid = pME->uid;
pKey = &tbDbKey;
kLen = sizeof(tbDbKey);
int32_t ret = 0;
tEncodeSize(metaEncodeEntry, pME, vLen, ret);
if (ret < 0) {
goto _err;
}
pVal = taosMemoryMalloc(vLen);
if (pVal == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
tEncoderInit(&coder, pVal, vLen);
if (metaEncodeEntry(&coder, pME) < 0) {
goto _err;
}
tEncoderClear(&coder);
// write to table.db
if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
goto _err;
}
taosMemoryFree(pVal);
return 0;
_err:
taosMemoryFree(pVal);
return -1;
}
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
}
static int metaUpdateSmaIdx(SMeta *pMeta, const SMetaEntry *pME) {
SSmaIdxKey smaIdxKey = {.uid = pME->smaEntry.tsma->tableUid, .smaUid = pME->smaEntry.tsma->indexUid};
return tdbDbInsert(pMeta->pSmaIdx, &smaIdxKey, sizeof(smaIdxKey), NULL, 0, &pMeta->txn);
}
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) {
metaWLock(pMeta);
// save to table.db
if (metaSaveSmaToDB(pMeta, pME) < 0) goto _err;
// // update uid.idx
if (metaUpdateUidIdx(pMeta, pME) < 0) goto _err;
if (metaUpdateSmaIdx(pMeta, pME) < 0) goto _err;
metaULock(pMeta);
return 0;
_err:
metaULock(pMeta);
return -1;
}

View File

@ -13,35 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_TSDB_SMA_H_
#define _TD_VNODE_TSDB_SMA_H_
#include "sma.h"
#include "tsdb.h"
#ifdef __cplusplus
extern "C" {
#endif
// TODO: Who is responsible for resource allocate and release?
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg) {
int32_t code = TSDB_CODE_SUCCESS;
// typedef int32_t (*__tb_ddl_fn_t)(void *ahandle, void **result, void *p1, void *p2);
// struct STbDdlH {
// void *ahandle;
// void *result;
// __tb_ddl_fn_t fp;
// };
static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) {
ASSERT(*pStore == NULL);
*pStore = taosMemoryCalloc(1, sizeof(STbUidStore));
if (*pStore == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) {
smaWarn("vgId:%d insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
return TSDB_CODE_SUCCESS;
// TODO: destroy SSDataBlocks(msg)
return code;
}
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_TSDB_SMA_H_*/

View File

@ -0,0 +1,463 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sma.h"
typedef struct SSmaStat SSmaStat;
static const char *TSDB_SMA_DNAME[] = {
"", // TSDB_SMA_TYPE_BLOCK
"tsma", // TSDB_SMA_TYPE_TIME_RANGE
"rsma", // TSDB_SMA_TYPE_ROLLUP
};
#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test
#define SMA_TEST_INDEX_UID 2000000001 // TODO: just for test
#define SMA_STATE_HASH_SLOT 4
#define RSMA_TASK_INFO_HASH_SLOT 8
typedef struct SPoolMem {
int64_t size;
struct SPoolMem *prev;
struct SPoolMem *next;
} SPoolMem;
// declaration of static functions
// insert data
static void tdGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]);
// Pool Memory
static SPoolMem *openPool();
static void clearPool(SPoolMem *pPool);
static void closePool(SPoolMem *pPool);
static void *poolMalloc(void *arg, size_t size);
static void poolFree(void *arg, void *ptr);
// implementation
static SPoolMem *openPool() {
SPoolMem *pPool = (SPoolMem *)taosMemoryMalloc(sizeof(*pPool));
pPool->prev = pPool->next = pPool;
pPool->size = 0;
return pPool;
}
static void clearPool(SPoolMem *pPool) {
if (!pPool) return;
SPoolMem *pMem;
do {
pMem = pPool->next;
if (pMem == pPool) break;
pMem->next->prev = pMem->prev;
pMem->prev->next = pMem->next;
pPool->size -= pMem->size;
taosMemoryFree(pMem);
} while (1);
assert(pPool->size == 0);
}
static void closePool(SPoolMem *pPool) {
if (pPool) {
clearPool(pPool);
taosMemoryFree(pPool);
}
}
static void *poolMalloc(void *arg, size_t size) {
void *ptr = NULL;
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;
pMem = (SPoolMem *)taosMemoryMalloc(sizeof(*pMem) + size);
if (!pMem) {
assert(0);
}
pMem->size = sizeof(*pMem) + size;
pMem->next = pPool->next;
pMem->prev = pPool;
pPool->next->prev = pMem;
pPool->next = pMem;
pPool->size += pMem->size;
ptr = (void *)(&pMem[1]);
return ptr;
}
static void poolFree(void *arg, void *ptr) {
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;
pMem = &(((SPoolMem *)ptr)[-1]);
pMem->next->prev = pMem->prev;
pMem->prev->next = pMem->next;
pPool->size -= pMem->size;
taosMemoryFree(pMem);
}
int32_t tdInitSma(SSma *pSma) {
// tSma
int32_t numOfTSma = taosArrayGetSize(metaGetSmaTbUids(SMA_META(pSma), false));
if (numOfTSma > 0) {
atomic_store_16(&SMA_TSMA_NUM(pSma), (int16_t)numOfTSma);
}
// TODO: rSma
return TSDB_CODE_SUCCESS;
}
static void tdGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TSDB_SMA_DNAME[smaType]);
}
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path, SDiskID did) {
SSmaEnv *pEnv = NULL;
pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv));
if (!pEnv) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SMA_ENV_TYPE(pEnv) = smaType;
int code = taosThreadRwlockInit(&(pEnv->lock), NULL);
if (code) {
terrno = TAOS_SYSTEM_ERROR(code);
taosMemoryFree(pEnv);
return NULL;
}
ASSERT(path && (strlen(path) > 0));
SMA_ENV_PATH(pEnv) = strdup(path);
if (!SMA_ENV_PATH(pEnv)) {
tdFreeSmaEnv(pEnv);
return NULL;
}
SMA_ENV_DID(pEnv) = did;
if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType) != TSDB_CODE_SUCCESS) {
tdFreeSmaEnv(pEnv);
return NULL;
}
char aname[TSDB_FILENAME_LEN] = {0};
tfsAbsoluteName(SMA_TFS(pSma), did, path, aname);
if (smaOpenDBEnv(&pEnv->dbEnv, aname) != TSDB_CODE_SUCCESS) {
tdFreeSmaEnv(pEnv);
return NULL;
}
if (!(pEnv->pPool = openPool())) {
tdFreeSmaEnv(pEnv);
return NULL;
}
return pEnv;
}
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SDiskID did, SSmaEnv **pEnv) {
if (!pEnv) {
terrno = TSDB_CODE_INVALID_PTR;
return TSDB_CODE_FAILED;
}
if (!(*pEnv)) {
if (!(*pEnv = tdNewSmaEnv(pSma, smaType, path, did))) {
return TSDB_CODE_FAILED;
}
}
return TSDB_CODE_SUCCESS;
}
/**
* @brief Release resources allocated for its member fields, not including itself.
*
* @param pSmaEnv
* @return int32_t
*/
void tdDestroySmaEnv(SSmaEnv *pSmaEnv) {
if (pSmaEnv) {
tdDestroySmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv));
taosMemoryFreeClear(pSmaEnv->pStat);
taosMemoryFreeClear(pSmaEnv->path);
taosThreadRwlockDestroy(&(pSmaEnv->lock));
smaCloseDBEnv(pSmaEnv->dbEnv);
closePool(pSmaEnv->pPool);
}
}
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv) {
tdDestroySmaEnv(pSmaEnv);
taosMemoryFreeClear(pSmaEnv);
return NULL;
}
int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat) {
if (!pStat) return 0;
int ref = T_REF_INC(pStat);
smaDebug("vgId:%d ref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
return 0;
}
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
if (!pStat) return 0;
int ref = T_REF_DEC(pStat);
smaDebug("vgId:%d unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
return 0;
}
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
ASSERT(pSmaStat != NULL);
if (*pSmaStat) { // no lock
return TSDB_CODE_SUCCESS;
}
/**
* 1. Lazy mode utilized when init SSmaStat to update expired window(or hungry mode when tdNew).
* 2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
* tdInitSmaStat invoked in other multithread environment later.
*/
if (!(*pSmaStat)) {
*pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat));
if (!(*pSmaStat)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
if (smaType == TSDB_SMA_TYPE_ROLLUP) {
SMA_STAT_INFO_HASH(*pSmaStat) = taosHashInit(
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!SMA_STAT_INFO_HASH(*pSmaStat)) {
taosMemoryFreeClear(*pSmaStat);
return TSDB_CODE_FAILED;
}
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
SMA_STAT_ITEMS(*pSmaStat) =
taosHashInit(SMA_STATE_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (!SMA_STAT_ITEMS(*pSmaStat)) {
taosMemoryFreeClear(*pSmaStat);
return TSDB_CODE_FAILED;
}
} else {
ASSERT(0);
}
}
return TSDB_CODE_SUCCESS;
}
void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
if (pSmaStatItem) {
tdDestroyTSma(pSmaStatItem->pTSma);
taosMemoryFreeClear(pSmaStatItem->pTSma);
taosHashCleanup(pSmaStatItem->expiredWindows);
taosMemoryFreeClear(pSmaStatItem);
}
return NULL;
}
/**
* @brief Release resources allocated for its member fields, not including itself.
*
* @param pSmaStat
* @return int32_t
*/
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
if (pSmaStat) {
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
void *item = taosHashIterate(SMA_STAT_ITEMS(pSmaStat), NULL);
while (item) {
SSmaStatItem *pItem = *(SSmaStatItem **)item;
tdFreeSmaStatItem(pItem);
item = taosHashIterate(SMA_STAT_ITEMS(pSmaStat), item);
}
taosHashCleanup(SMA_STAT_ITEMS(pSmaStat));
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
void *infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), NULL);
while (infoHash) {
SRSmaInfo *pInfoHash = *(SRSmaInfo **)infoHash;
tdFreeRSmaInfo(pInfoHash);
infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), infoHash);
}
taosHashCleanup(SMA_STAT_INFO_HASH(pSmaStat));
} else {
ASSERT(0);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t tdLockSma(SSma *pSma) {
int code = taosThreadMutexLock(&pSma->mutex);
if (code != 0) {
smaError("vgId:%d failed to lock td since %s", SMA_VID(pSma), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
pSma->locked = true;
return 0;
}
int32_t tdUnLockSma(SSma *pSma) {
ASSERT(SMA_LOCKED(pSma));
pSma->locked = false;
int code = taosThreadMutexUnlock(&pSma->mutex);
if (code != 0) {
smaError("vgId:%d failed to unlock td since %s", SMA_VID(pSma), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
SSmaEnv *pEnv = NULL;
// return if already init
switch (smaType) {
case TSDB_SMA_TYPE_TIME_RANGE:
if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_TSMA_ENV(pSma)))) {
return TSDB_CODE_SUCCESS;
}
break;
case TSDB_SMA_TYPE_ROLLUP:
if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_RSMA_ENV(pSma)))) {
return TSDB_CODE_SUCCESS;
}
break;
default:
TASSERT(0);
return TSDB_CODE_FAILED;
}
// init sma env
tdLockSma(pSma);
pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&SMA_TSMA_ENV(pSma))
: atomic_load_ptr(&SMA_RSMA_ENV(pSma));
if (!pEnv) {
char rname[TSDB_FILENAME_LEN] = {0};
SDiskID did = {0};
if (tfsAllocDisk(SMA_TFS(pSma), TFS_PRIMARY_LEVEL, &did) < 0) {
tdUnLockSma(pSma);
return TSDB_CODE_FAILED;
}
if (did.level < 0 || did.id < 0) {
tdUnLockSma(pSma);
smaError("vgId:%d init sma env failed since invalid did(%d,%d)", SMA_VID(pSma), did.level, did.id);
return TSDB_CODE_FAILED;
}
tdGetSmaDir(SMA_VID(pSma), smaType, rname);
if (tfsMkdirRecurAt(SMA_TFS(pSma), rname, did) < 0) {
tdUnLockSma(pSma);
return TSDB_CODE_FAILED;
}
if (tdInitSmaEnv(pSma, smaType, rname, did, &pEnv) < 0) {
tdUnLockSma(pSma);
return TSDB_CODE_FAILED;
}
(smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), pEnv)
: atomic_store_ptr(&SMA_RSMA_ENV(pSma), pEnv);
}
tdUnLockSma(pSma);
return TSDB_CODE_SUCCESS;
};
int32_t tdSmaBeginCommit(SSmaEnv *pEnv) {
TXN *pTxn = &pEnv->txn;
// start a new txn
tdbTxnOpen(pTxn, 0, poolMalloc, poolFree, pEnv->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
if (tdbBegin(pEnv->dbEnv, pTxn) != 0) {
smaWarn("tdSma tdb begin commit fail");
return -1;
}
return 0;
}
int32_t tdSmaEndCommit(SSmaEnv *pEnv) {
TXN *pTxn = &pEnv->txn;
// Commit current txn
if (tdbCommit(pEnv->dbEnv, pTxn) != 0) {
smaWarn("tdSma tdb end commit fail");
return -1;
}
tdbTxnClose(pTxn);
clearPool(pEnv->pPool);
return 0;
}
#if 0
/**
* @brief Get the start TS key of the last data block of one interval/sliding.
*
* @param pSma
* @param param
* @param result
* @return int32_t
* 1) Return 0 and fill the result if the check procedure is normal;
* 2) Return -1 if error occurs during the check procedure.
*/
int32_t tdGetTSmaStatus(SSma *pSma, void *smaIndex, void *result) {
const char *procedure = "";
if (strncmp(procedure, "get the start TS key of the last data block", 100) != 0) {
return -1;
}
// fill the result
return TSDB_CODE_SUCCESS;
}
/**
* @brief Remove the tSma data files related to param between pWin.
*
* @param pSma
* @param param
* @param pWin
* @return int32_t
*/
int32_t tdRemoveTSmaData(SSma *pSma, void *smaIndex, STimeWindow *pWin) {
// for ("tSmaFiles of param-interval-sliding between pWin") {
// // remove the tSmaFile
// }
return TSDB_CODE_SUCCESS;
}
#endif

View File

@ -0,0 +1,137 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sma.h"
#include "tsdb.h"
static int32_t smaEvalDays(SRetention *r, int8_t precision);
static int32_t smaSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type);
#define SMA_SET_KEEP_CFG(l) \
do { \
SRetention *r = &pCfg->retentions[l]; \
pKeepCfg->keep2 = convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE); \
pKeepCfg->keep0 = pKeepCfg->keep2; \
pKeepCfg->keep1 = pKeepCfg->keep2; \
pKeepCfg->days = smaEvalDays(r, pCfg->precision); \
} while (0)
#define SMA_OPEN_RSMA_IMPL(v, l) \
do { \
SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \
if (!RETENTION_VALID(r)) { \
if (l == 0) { \
goto _err; \
} \
break; \
} \
smaSetKeepCfg(&keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \
if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg) < 0) { \
goto _err; \
} \
} while (0)
#define RETENTION_DAYS_SPLIT_RATIO 10
#define RETENTION_DAYS_SPLIT_MIN 1
#define RETENTION_DAYS_SPLIT_MAX 30
static int32_t smaEvalDays(SRetention *r, int8_t precision) {
int32_t keepDays = convertTimeFromPrecisionToUnit(r->keep, precision, TIME_UNIT_DAY);
int32_t freqDays = convertTimeFromPrecisionToUnit(r->freq, precision, TIME_UNIT_DAY);
int32_t days = keepDays / RETENTION_DAYS_SPLIT_RATIO;
if (days <= RETENTION_DAYS_SPLIT_MIN) {
days = RETENTION_DAYS_SPLIT_MIN;
if (days < freqDays) {
days = freqDays + 1;
}
} else {
if (days > RETENTION_DAYS_SPLIT_MAX) {
days = RETENTION_DAYS_SPLIT_MAX;
}
if (days < freqDays) {
days = freqDays + 1;
}
}
return days * 1440;
}
int smaSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type) {
pKeepCfg->precision = pCfg->precision;
switch (type) {
case TSDB_TYPE_TSMA:
ASSERT(0);
break;
case TSDB_TYPE_RSMA_L0:
SMA_SET_KEEP_CFG(0);
break;
case TSDB_TYPE_RSMA_L1:
SMA_SET_KEEP_CFG(1);
break;
case TSDB_TYPE_RSMA_L2:
SMA_SET_KEEP_CFG(2);
break;
default:
ASSERT(0);
break;
}
return 0;
}
int32_t smaOpen(SVnode *pVnode) {
STsdbCfg *pCfg = &pVnode->config.tsdbCfg;
ASSERT(!pVnode->pSma);
SSma *pSma = taosMemoryCalloc(1, sizeof(SSma));
if (!pSma) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pSma->pVnode = pVnode;
taosThreadMutexInit(&pSma->mutex, NULL);
pSma->locked = false;
if (vnodeIsRollup(pVnode)) {
STsdbKeepCfg keepCfg = {0};
for (int i = 0; i < TSDB_RETENTION_MAX; ++i) {
if (i == TSDB_RETENTION_L0) {
SMA_OPEN_RSMA_IMPL(pVnode, 0);
} else if (i == TSDB_RETENTION_L1) {
SMA_OPEN_RSMA_IMPL(pVnode, 1);
} else if (i == TSDB_RETENTION_L2) {
SMA_OPEN_RSMA_IMPL(pVnode, 2);
} else {
ASSERT(0);
}
}
}
pVnode->pSma = pSma;
return 0;
_err:
taosMemoryFreeClear(pSma);
return -1;
}
int32_t smaClose(SSma *pSma) {
if (pSma) {
taosThreadMutexDestroy(&pSma->mutex);
if SMA_RSMA_TSDB0 (pSma) tsdbClose(&SMA_RSMA_TSDB0(pSma));
if SMA_RSMA_TSDB1 (pSma) tsdbClose(&SMA_RSMA_TSDB1(pSma));
if SMA_RSMA_TSDB2 (pSma) tsdbClose(&SMA_RSMA_TSDB2(pSma));
}
return 0;
}

View File

@ -0,0 +1,484 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sma.h"
static FORCE_INLINE int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, qTaskInfo_t *taskInfo,
STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, int8_t level);
struct SRSmaInfo {
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
};
static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle) {
// Note: free/kill may in RC
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
qDestroyTask(otaskHandle);
}
}
void *tdFreeRSmaInfo(SRSmaInfo *pInfo) {
for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
if (pInfo->taskInfo[i]) {
tdFreeTaskHandle(pInfo->taskInfo[i]);
}
}
return NULL;
}
static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
ASSERT(*pStore == NULL);
*pStore = taosMemoryCalloc(1, sizeof(STbUidStore));
if (*pStore == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL;
if (!suid || !tbUids) {
terrno = TSDB_CODE_INVALID_PTR;
smaError("vgId:%d failed to get rsma info for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno));
return TSDB_CODE_FAILED;
}
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t));
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
smaError("vgId:%d failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
return TSDB_CODE_FAILED;
}
if (pRSmaInfo->taskInfo[0] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[0], tbUids, true) != 0)) {
smaError("vgId:%d update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno));
return TSDB_CODE_FAILED;
} else {
smaDebug("vgId:%d update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma),
pRSmaInfo->taskInfo[0], *suid, *(int64_t *)taosArrayGet(tbUids, 0));
}
if (pRSmaInfo->taskInfo[1] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[1], tbUids, true) != 0)) {
smaError("vgId:%d update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno));
return TSDB_CODE_FAILED;
} else {
smaDebug("vgId:%d update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma),
pRSmaInfo->taskInfo[1], *suid, *(int64_t *)taosArrayGet(tbUids, 0));
}
return TSDB_CODE_SUCCESS;
}
int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore) {
if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) {
return TSDB_CODE_SUCCESS;
}
if (tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
}
void *pIter = taosHashIterate(pStore->uidHash, NULL);
while (pIter) {
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
SArray *pTbUids = *(SArray **)pIter;
if (tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids) != TSDB_CODE_SUCCESS) {
taosHashCancelIterate(pStore->uidHash, pIter);
return TSDB_CODE_FAILED;
}
pIter = taosHashIterate(pStore->uidHash, pIter);
}
return TSDB_CODE_SUCCESS;
}
/**
* @brief fetch suid/uids when create child tables of rollup SMA
*
* @param pTsdb
* @param ppStore
* @param suid
* @param uid
* @return int32_t
*/
int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_uid_t uid) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
// only applicable to rollup SMA ctables
if (!pEnv) {
return TSDB_CODE_SUCCESS;
}
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SHashObj *infoHash = NULL;
if (!pStat || !(infoHash = SMA_STAT_INFO_HASH(pStat))) {
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
return TSDB_CODE_FAILED;
}
// info cached when create rsma stable and return directly for non-rsma ctables
if (!taosHashGet(infoHash, &suid, sizeof(tb_uid_t))) {
return TSDB_CODE_SUCCESS;
}
ASSERT(ppStore != NULL);
if (!(*ppStore)) {
if (tdUidStoreInit(ppStore) != 0) {
return TSDB_CODE_FAILED;
}
}
if (tdUidStorePut(*ppStore, suid, &uid) != 0) {
*ppStore = tdUidStoreFree(*ppStore);
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
/**
* @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam.
*
* @param pTsdb
* @param pMeta
* @param pReq
* @return int32_t
*/
int32_t tdProcessRSmaCreate(SSma *pSma, SMeta *pMeta, SVCreateStbReq *pReq, SMsgCb *pMsgCb) {
if (!pReq->rollup) {
smaTrace("vgId:%d return directly since no rollup for stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
return TSDB_CODE_SUCCESS;
}
SRSmaParam *param = &pReq->pRSmaParam;
if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) {
smaWarn("vgId:%d no qmsg1/qmsg2 for rollup stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
return TSDB_CODE_SUCCESS;
}
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INIT_FAILED;
return TSDB_CODE_FAILED;
}
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL;
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t));
if (pRSmaInfo) {
smaWarn("vgId:%d rsma info already exists for stb: %s, %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
return TSDB_CODE_SUCCESS;
}
pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo));
if (!pRSmaInfo) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
STqReadHandle *pReadHandle = tqInitSubmitMsgScanner(pMeta);
if (!pReadHandle) {
taosMemoryFree(pRSmaInfo);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
SReadHandle handle = {
.reader = pReadHandle,
.meta = pMeta,
.pMsgCb = pMsgCb,
};
if (param->qmsg1) {
pRSmaInfo->taskInfo[0] = qCreateStreamExecTaskInfo(param->qmsg1, &handle);
if (!pRSmaInfo->taskInfo[0]) {
taosMemoryFree(pRSmaInfo);
taosMemoryFree(pReadHandle);
return TSDB_CODE_FAILED;
}
}
if (param->qmsg2) {
pRSmaInfo->taskInfo[1] = qCreateStreamExecTaskInfo(param->qmsg2, &handle);
if (!pRSmaInfo->taskInfo[1]) {
taosMemoryFree(pRSmaInfo);
taosMemoryFree(pReadHandle);
return TSDB_CODE_FAILED;
}
}
if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) !=
TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
} else {
smaDebug("vgId:%d register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), pReq->suid);
}
return TSDB_CODE_SUCCESS;
}
/**
* @brief store suid/[uids], prefer to use array and then hash
*
* @param pStore
* @param suid
* @param uid
* @return int32_t
*/
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) {
// prefer to store suid/uids in array
if ((suid == pStore->suid) || (pStore->suid == 0)) {
if (pStore->suid == 0) {
pStore->suid = suid;
}
if (uid) {
if (!pStore->tbUids) {
if (!(pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
}
if (!taosArrayPush(pStore->tbUids, uid)) {
return TSDB_CODE_FAILED;
}
}
} else {
// store other suid/uids in hash when multiple stable/table included in 1 batch of request
if (!pStore->uidHash) {
pStore->uidHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (!pStore->uidHash) {
return TSDB_CODE_FAILED;
}
}
if (uid) {
SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t));
if (uidArray && ((uidArray = *(SArray **)uidArray))) {
taosArrayPush(uidArray, uid);
} else {
SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t));
if (!pUidArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
if (!taosArrayPush(pUidArray, uid)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)) != 0) {
return TSDB_CODE_FAILED;
}
}
} else {
if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0) != 0) {
return TSDB_CODE_FAILED;
}
}
}
return TSDB_CODE_SUCCESS;
}
void tdUidStoreDestory(STbUidStore *pStore) {
if (pStore) {
if (pStore->uidHash) {
if (pStore->tbUids) {
// When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys.
void *pIter = taosHashIterate(pStore->uidHash, NULL);
while (pIter) {
SArray *arr = *(SArray **)pIter;
taosArrayDestroy(arr);
pIter = taosHashIterate(pStore->uidHash, pIter);
}
}
taosHashCleanup(pStore->uidHash);
}
taosArrayDestroy(pStore->tbUids);
}
}
void *tdUidStoreFree(STbUidStore *pStore) {
if (pStore) {
tdUidStoreDestory(pStore);
taosMemoryFree(pStore);
}
return NULL;
}
static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
if (!pReq) {
terrno = TSDB_CODE_INVALID_PTR;
return TSDB_CODE_FAILED;
}
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) {
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
ASSERT(pMsg != NULL);
SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock = NULL;
SSubmitBlkIter blkIter = {0};
STSRow *row = NULL;
terrno = TSDB_CODE_SUCCESS;
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (!pBlock) break;
tdUidStorePut(pStore, msgIter.suid, NULL);
pStore->uid = msgIter.uid; // TODO: remove, just for debugging
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
return 0;
}
static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, qTaskInfo_t *taskInfo,
STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, int8_t level) {
SArray *pResult = NULL;
if (!taskInfo) {
smaDebug("vgId:%d no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid);
return TSDB_CODE_SUCCESS;
}
smaDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, taskInfo, suid);
qSetStreamInput(taskInfo, pMsg, inputType);
while (1) {
SSDataBlock *output = NULL;
uint64_t ts;
if (qExecTask(taskInfo, &output, &ts) < 0) {
ASSERT(false);
}
if (!output) {
break;
}
if (!pResult) {
pResult = taosArrayInit(0, sizeof(SSDataBlock));
if (!pResult) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
}
taosArrayPush(pResult, output);
}
if (taosArrayGetSize(pResult) > 0) {
blockDebugShowData(pResult);
STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2);
SSubmitReq *pReq = NULL;
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), uid, suid) != 0) {
taosArrayDestroy(pResult);
return TSDB_CODE_FAILED;
}
if (tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) != 0) {
taosArrayDestroy(pResult);
taosMemoryFreeClear(pReq);
return TSDB_CODE_FAILED;
}
taosMemoryFreeClear(pReq);
} else {
smaWarn("vgId:%d no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno));
}
taosArrayDestroy(pResult);
return TSDB_CODE_SUCCESS;
}
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid, tb_uid_t uid) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pEnv) {
// only applicable when rsma env exists
return TSDB_CODE_SUCCESS;
}
ASSERT(uid != 0); // TODO: remove later
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL;
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
smaDebug("vgId:%d no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
return TSDB_CODE_SUCCESS;
}
if (!pRSmaInfo->taskInfo[0]) {
smaDebug("vgId:%d no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid);
return TSDB_CODE_SUCCESS;
}
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
// TODO: use the proper schema instead of 0, and cache STSchema in cache
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, 0);
if (!pTSchema) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return TSDB_CODE_FAILED;
}
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, uid, TSDB_RETENTION_L1);
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, uid, TSDB_RETENTION_L2);
taosMemoryFree(pTSchema);
}
return TSDB_CODE_SUCCESS;
}
int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pEnv) {
// only applicable when rsma env exists
return TSDB_CODE_SUCCESS;
}
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
STbUidStore uidStore = {0};
tdFetchSubmitReqSuids(pMsg, &uidStore);
if (uidStore.suid != 0) {
tdExecuteRSma(pSma, pMsg, inputType, uidStore.suid, uidStore.uid);
void *pIter = taosHashIterate(uidStore.uidHash, NULL);
while (pIter) {
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
tdExecuteRSma(pSma, pMsg, inputType, *pTbSuid, 0);
pIter = taosHashIterate(uidStore.uidHash, pIter);
}
tdUidStoreDestory(&uidStore);
}
}
return TSDB_CODE_SUCCESS;
}

View File

@ -0,0 +1,128 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ALLOW_FORBID_FUNC
#include "sma.h"
int32_t smaOpenDBEnv(TENV **ppEnv, const char *path) {
int ret = 0;
if (path == NULL) return -1;
ret = tdbEnvOpen(path, 4096, 256, ppEnv); // use as param
if (ret != 0) {
smaError("failed to create tsdb db env, ret = %d", ret);
return -1;
}
return 0;
}
int32_t smaCloseDBEnv(TENV *pEnv) { return tdbEnvClose(pEnv); }
static inline int tdSmaKeyCmpr(const void *arg1, int len1, const void *arg2, int len2) {
const SSmaKey *pKey1 = (const SSmaKey *)arg1;
const SSmaKey *pKey2 = (const SSmaKey *)arg2;
ASSERT(len1 == len2 && len1 == sizeof(SSmaKey));
if (pKey1->skey < pKey2->skey) {
return -1;
} else if (pKey1->skey > pKey2->skey) {
return 1;
}
if (pKey1->groupId < pKey2->groupId) {
return -1;
} else if (pKey1->groupId > pKey2->groupId) {
return 1;
}
return 0;
}
static int32_t smaOpenDBDb(TDB **ppDB, TENV *pEnv, const char *pFName) {
int ret;
tdb_cmpr_fn_t compFunc;
// Create a database
compFunc = tdSmaKeyCmpr;
ret = tdbDbOpen(pFName, -1, -1, compFunc, pEnv, ppDB);
return 0;
}
static int32_t smaCloseDBDb(TDB *pDB) { return tdbDbClose(pDB); }
int32_t smaOpenDBF(TENV *pEnv, SDBFile *pDBF) {
// TEnv is shared by a group of SDBFile
if (!pEnv || !pDBF) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
// Open DBF
if (smaOpenDBDb(&(pDBF->pDB), pEnv, pDBF->path) < 0) {
terrno = TSDB_CODE_TDB_INIT_FAILED;
smaCloseDBDb(pDBF->pDB);
return -1;
}
return 0;
}
int32_t smaCloseDBF(SDBFile *pDBF) {
int32_t ret = 0;
if (pDBF->pDB) {
ret = smaCloseDBDb(pDBF->pDB);
pDBF->pDB = NULL;
}
taosMemoryFreeClear(pDBF->path);
return ret;
}
int32_t smaSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) {
int32_t ret;
ret = tdbDbInsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
if (ret < 0) {
smaError("failed to create insert sma data into db, ret = %d", ret);
return -1;
}
return 0;
}
void *smaGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_t *valLen) {
void *pVal = NULL;
int ret;
ret = tdbDbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
if (ret < 0) {
smaError("failed to get sma data from db, ret = %d", ret);
return NULL;
}
ASSERT(*valLen >= 0);
// TODO: lock?
// TODO: Would the key/value be destoryed during return the data?
// TODO: How about the key is updated while value length is changed? The original value buffer would be freed
// automatically?
return pVal;
}

File diff suppressed because it is too large Load Diff

View File

@ -234,7 +234,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
if (msgType != TDMT_VND_SUBMIT) return 0;
// make sure msgType == TDMT_VND_SUBMIT
if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, ver) != 0) {
if (tdUpdateExpireWindow(pTq->pVnode->pSma, msg, ver) != 0) {
return -1;
}

View File

@ -16,6 +16,8 @@
#include "tsdb.h"
int tsdbBegin(STsdb *pTsdb) {
if (!pTsdb) return 0;
STsdbMemTable *pMem;
if (tsdbMemTableCreate(pTsdb, &pTsdb->mem) < 0) {

View File

@ -37,12 +37,12 @@ static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired);
// static int tsdbProcessExpiredFS(STsdb *pRepo);
// static int tsdbCreateMeta(STsdb *pRepo);
static void tsdbGetRootDir(int repoid, int8_t level, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s", repoid, TSDB_LEVEL_DNAME[level]);
static void tsdbGetRootDir(int repoid, const char* dir, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s", repoid, dir);
}
static void tsdbGetDataDir(int repoid, int8_t level, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/data", repoid, TSDB_LEVEL_DNAME[level]);
static void tsdbGetDataDir(int repoid, const char* dir, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/data", repoid, dir);
}
// For backward compatibility
@ -591,7 +591,7 @@ static int tsdbComparFidFSet(const void *arg1, const void *arg2) {
static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/%s/%s", tfsGetPrimaryPath(REPO_TFS(pRepo)), REPO_ID(pRepo),
TSDB_LEVEL_DNAME[REPO_LEVEL(pRepo)], tsdbTxnFname[ftype]);
pRepo->dir, tsdbTxnFname[ftype]);
}
static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
@ -721,7 +721,7 @@ static int tsdbScanRootDir(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
const STfsFile *pf;
tsdbGetRootDir(REPO_ID(pRepo), REPO_LEVEL(pRepo), rootDir);
tsdbGetRootDir(REPO_ID(pRepo), pRepo->dir, rootDir);
STfsDir *tdir = tfsOpendir(REPO_TFS(pRepo), rootDir);
if (tdir == NULL) {
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno));
@ -755,7 +755,7 @@ static int tsdbScanDataDir(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
const STfsFile *pf;
tsdbGetDataDir(REPO_ID(pRepo), REPO_LEVEL(pRepo), dataDir);
tsdbGetDataDir(REPO_ID(pRepo), pRepo->dir, dataDir);
STfsDir *tdir = tfsOpendir(REPO_TFS(pRepo), dataDir);
if (tdir == NULL) {
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno));
@ -803,7 +803,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
regex_t regex;
STsdbFS *pfs = REPO_FS(pRepo);
tsdbGetDataDir(REPO_ID(pRepo), REPO_LEVEL(pRepo), dataDir);
tsdbGetDataDir(REPO_ID(pRepo), pRepo->dir, dataDir);
// Resource allocation and init
regcomp(&regex, pattern, REG_EXTENDED);

View File

@ -23,14 +23,6 @@ static const char *TSDB_FNAME_SUFFIX[] = {
"smal", // TSDB_FILE_SMAL
"", // TSDB_FILE_MAX
"meta", // TSDB_FILE_META
"tsma", // TSDB_FILE_TSMA
"rsma", // TSDB_FILE_RSMA
};
const char *TSDB_LEVEL_DNAME[] = {
"tsdb",
"rsma1",
"rsma2",
};
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, const char* dname, char *fname);
@ -51,7 +43,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
pDFile->info.fver = tsdbGetDFSVersion(ftype);
tsdbGetFilename(REPO_ID(pRepo), fid, ver, ftype, TSDB_LEVEL_DNAME[pRepo->level], fname);
tsdbGetFilename(REPO_ID(pRepo), fid, ver, ftype, pRepo->dir, fname);
tfsInitFile(REPO_TFS(pRepo), &(pDFile->f), did, fname);
}

View File

@ -15,100 +15,17 @@
#include "tsdb.h"
#define TSDB_OPEN_RSMA_IMPL(v, l) \
do { \
SRetention *r = VND_RETENTIONS(v)[0]; \
if (RETENTION_VALID(r)) { \
return tsdbOpenImpl((v), type, &VND_RSMA##l(v), VNODE_RSMA##l##_DIR, TSDB_RETENTION_L##l); \
} \
} while (0)
static int tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg);
#define TSDB_SET_KEEP_CFG(l) \
do { \
SRetention *r = &pCfg->retentions[l]; \
pKeepCfg->keep2 = convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE); \
pKeepCfg->keep0 = pKeepCfg->keep2; \
pKeepCfg->keep1 = pKeepCfg->keep2; \
pKeepCfg->days = tsdbEvalDays(r, pCfg->precision); \
} while (0)
#define RETENTION_DAYS_SPLIT_RATIO 10
#define RETENTION_DAYS_SPLIT_MIN 1
#define RETENTION_DAYS_SPLIT_MAX 30
// implementation
static int32_t tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int8_t type);
static int32_t tsdbEvalDays(SRetention *r, int8_t precision);
static int32_t tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level);
int tsdbOpen(SVnode *pVnode, int8_t type) {
switch (type) {
case TSDB_TYPE_TSDB:
return tsdbOpenImpl(pVnode, type, &VND_TSDB(pVnode), VNODE_TSDB_DIR, TSDB_RETENTION_L0);
case TSDB_TYPE_TSMA:
ASSERT(0);
break;
case TSDB_TYPE_RSMA_L0:
TSDB_OPEN_RSMA_IMPL(pVnode, 0);
break;
case TSDB_TYPE_RSMA_L1:
TSDB_OPEN_RSMA_IMPL(pVnode, 1);
break;
case TSDB_TYPE_RSMA_L2:
TSDB_OPEN_RSMA_IMPL(pVnode, 2);
break;
default:
ASSERT(0);
break;
}
return 0;
}
static int32_t tsdbEvalDays(SRetention *r, int8_t precision) {
int32_t keepDays = convertTimeFromPrecisionToUnit(r->keep, precision, TIME_UNIT_DAY);
int32_t freqDays = convertTimeFromPrecisionToUnit(r->freq, precision, TIME_UNIT_DAY);
int32_t days = keepDays / RETENTION_DAYS_SPLIT_RATIO;
if (days <= RETENTION_DAYS_SPLIT_MIN) {
days = RETENTION_DAYS_SPLIT_MIN;
if (days < freqDays) {
days = freqDays + 1;
}
} else {
if (days > RETENTION_DAYS_SPLIT_MAX) {
days = RETENTION_DAYS_SPLIT_MAX;
}
if (days < freqDays) {
days = freqDays + 1;
}
}
return days * 1440;
}
static int32_t tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int8_t type) {
static int tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg) {
pKeepCfg->precision = pCfg->precision;
switch (type) {
case TSDB_TYPE_TSDB:
pKeepCfg->days = pCfg->days;
pKeepCfg->keep0 = pCfg->keep0;
pKeepCfg->keep1 = pCfg->keep1;
pKeepCfg->keep2 = pCfg->keep2;
break;
case TSDB_TYPE_TSMA:
ASSERT(0);
break;
case TSDB_TYPE_RSMA_L0:
TSDB_SET_KEEP_CFG(0);
break;
case TSDB_TYPE_RSMA_L1:
TSDB_SET_KEEP_CFG(1);
break;
case TSDB_TYPE_RSMA_L2:
TSDB_SET_KEEP_CFG(2);
break;
default:
ASSERT(0);
break;
}
pKeepCfg->days = pCfg->days;
pKeepCfg->keep0 = pCfg->keep0;
pKeepCfg->keep1 = pCfg->keep1;
pKeepCfg->keep2 = pCfg->keep2;
return 0;
}
@ -116,18 +33,16 @@ static int32_t tsdbSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int8_t typ
* @brief
*
* @param pVnode
* @param type
* @param ppTsdb
* @param dir
* @param level retention level
* @return int
*/
int32_t tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level) {
int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKeepCfg) {
STsdb *pTsdb = NULL;
int slen = 0;
*ppTsdb = NULL;
slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(dir) + 3;
slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(dir) + TSDB_DATA_DIR_LEN + 3;
// create handle
pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(*pTsdb) + slen);
@ -136,13 +51,18 @@ int32_t tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *di
return -1;
}
ASSERT(strlen(dir) < TSDB_DATA_DIR_LEN);
memcpy(pTsdb->dir, dir, strlen(dir));
pTsdb->path = (char *)&pTsdb[1];
sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, dir);
pTsdb->pVnode = pVnode;
pTsdb->level = level;
pTsdb->repoLocked = false;
taosThreadMutexInit(&pTsdb->mutex, NULL);
tsdbSetKeepCfg(REPO_KEEP_CFG(pTsdb), REPO_CFG(pTsdb), type);
if (!pKeepCfg) {
tsdbSetKeepCfg(&pTsdb->keepCfg, &pVnode->config.tsdbCfg);
} else {
memcpy(&pTsdb->keepCfg, pKeepCfg, sizeof(STsdbKeepCfg));
}
pTsdb->fs = tsdbNewFS(REPO_KEEP_CFG(pTsdb));
// create dir (TODO: use tfsMkdir)
@ -163,12 +83,13 @@ _err:
return -1;
}
int tsdbClose(STsdb *pTsdb) {
if (pTsdb) {
int tsdbClose(STsdb **pTsdb) {
if (*pTsdb) {
// TODO: destroy mem/imem
tsdbCloseFS(pTsdb);
tsdbFreeFS(pTsdb->fs);
taosMemoryFree(pTsdb);
taosThreadMutexDestroy(&(*pTsdb)->mutex);
tsdbCloseFS(*pTsdb);
tsdbFreeFS((*pTsdb)->fs);
taosMemoryFreeClear(*pTsdb);
}
return 0;
}

View File

@ -47,7 +47,7 @@ int vnodeBegin(SVnode *pVnode) {
}
// begin tsdb
if (vnodeIsRollup(pVnode)) {
if (pVnode->pSma) {
if (tsdbBegin(VND_RSMA0(pVnode)) < 0) {
vError("vgId:%d failed to begin rsma0 since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;

View File

@ -96,26 +96,15 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
}
// open tsdb
if (vnodeIsRollup(pVnode)) {
if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L0) < 0) {
vError("vgId:%d failed to open vnode rsma0 since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
if (!vnodeIsRollup(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, TSDB_TYPE_TSDB) < 0) {
vError("vgId:%d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L1) < 0) {
vError("vgId:%d failed to open vnode rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L2) < 0) {
vError("vgId:%d failed to open vnode rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
} else {
if (tsdbOpen(pVnode, TSDB_TYPE_TSDB) < 0) {
vError("vgId:%d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open sma
if (smaOpen(pVnode)) {
vError("vgId:%d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open wal
@ -161,10 +150,10 @@ _err:
if (pVnode->pQuery) vnodeQueryClose(pVnode);
if (pVnode->pTq) tqClose(pVnode->pTq);
if (pVnode->pWal) walClose(pVnode->pWal);
if (pVnode->pTsdb) tsdbClose(pVnode->pTsdb);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
if (pVnode->pMeta) metaClose(pVnode->pMeta);
tsdbClose(VND_RSMA1(pVnode));
tsdbClose(VND_RSMA2(pVnode));
if (pVnode->pSma) smaClose(pVnode->pSma);
tsem_destroy(&(pVnode->canCommit));
taosMemoryFree(pVnode);
return NULL;
@ -177,9 +166,8 @@ void vnodeClose(SVnode *pVnode) {
vnodeQueryClose(pVnode);
walClose(pVnode->pWal);
tqClose(pVnode->pTq);
tsdbClose(VND_TSDB(pVnode));
tsdbClose(VND_RSMA1(pVnode));
tsdbClose(VND_RSMA2(pVnode));
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
smaClose(pVnode->pSma);
metaClose(pVnode->pMeta);
vnodeCloseBufPool(pVnode);
// destroy handle

View File

@ -22,6 +22,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
#if 0
@ -86,10 +87,8 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
case TDMT_VND_DROP_TABLE:
if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// TODO
}
case TDMT_VND_CREATE_SMA: {
if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
} break;
/* TSDB */
case TDMT_VND_SUBMIT:
@ -195,7 +194,7 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// TODO
// blockDebugShowData(data);
tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data);
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
}
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
@ -305,7 +304,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq,
goto _err;
}
tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &req, &pVnode->msgCb);
tdProcessRSmaCreate(pVnode->pSma, pVnode->pMeta, &req, &pVnode->msgCb);
tDecoderClear(&coder);
return 0;
@ -366,7 +365,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
}
} else {
cRsp.code = TSDB_CODE_SUCCESS;
tsdbFetchTbUidList(pVnode->pTsdb, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
}
taosArrayPush(rsp.pArray, &cRsp);
@ -374,8 +373,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
tDecoderClear(&decoder);
tsdbUpdateTbUidList(pVnode->pTsdb, pStore);
tsdbUidStoreFree(pStore);
tdUpdateTbUidList(pVnode->pSma, pStore);
tdUidStoreFree(pStore);
// prepare rsp
SEncoder encoder = {0};
@ -649,8 +648,38 @@ _exit:
// TODO: refactor
if ((terrno == TSDB_CODE_SUCCESS || terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) &&
(pRsp->code == TSDB_CODE_SUCCESS)) {
tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
}
return 0;
}
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
SVCreateTSmaReq req = {0};
SDecoder coder;
pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
// decode and process req
tDecoderInit(&coder, pReq, len);
if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
pRsp->code = terrno;
goto _err;
}
if (metaCreateTSma(pVnode->pMeta, version, &req) < 0) {
pRsp->code = terrno;
goto _err;
}
tDecoderClear(&coder);
return 0;
_err:
tDecoderClear(&coder);
return -1;
}

View File

@ -2949,6 +2949,8 @@ static int32_t translateCreateIndex(STranslateContext* pCxt, SCreateIndexStmt* p
}
static int32_t translateDropIndex(STranslateContext* pCxt, SDropIndexStmt* pStmt) {
SEncoder encoder = {0};
int32_t contLen = 0;
SVDropTSmaReq dropSmaReq = {0};
strcpy(dropSmaReq.indexName, pStmt->indexName);
@ -2956,16 +2958,26 @@ static int32_t translateDropIndex(STranslateContext* pCxt, SDropIndexStmt* pStmt
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t ret = 0;
tEncodeSize(tEncodeSVDropTSmaReq, &dropSmaReq, contLen, ret);
if (ret < 0) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_VND_DROP_SMA;
pCxt->pCmdMsg->msgLen = tSerializeSVDropTSmaReq(NULL, &dropSmaReq);
pCxt->pCmdMsg->msgLen = contLen;
pCxt->pCmdMsg->pMsg = taosMemoryMalloc(pCxt->pCmdMsg->msgLen);
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
void* pBuf = pCxt->pCmdMsg->pMsg;
tSerializeSVDropTSmaReq(&pBuf, &dropSmaReq);
if (tEncodeSVDropTSmaReq(&encoder, &dropSmaReq) < 0) {
tEncoderClear(&encoder);
return TSDB_CODE_OUT_OF_MEMORY;
}
tEncoderClear(&encoder);
return TSDB_CODE_SUCCESS;
}

View File

@ -354,6 +354,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECREATED, "Table re-created")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_SMA_STAT, "Invalid sma state")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TSMA_ALREADY_EXIST, "Tsma already exists")
// query

View File

@ -94,6 +94,7 @@ int32_t tqDebugFlag = 135;
int32_t fsDebugFlag = 135;
int32_t metaDebugFlag = 135;
int32_t fnDebugFlag = 135;
int32_t smaDebugFlag = 135;
int64_t dbgEmptyW = 0;
int64_t dbgWN = 0;
@ -755,6 +756,7 @@ void taosSetAllDebugFlag(int32_t flag) {
tqDebugFlag = flag;
fsDebugFlag = flag;
fnDebugFlag = flag;
smaDebugFlag = flag;
uInfo("all debug flag are set to %d", flag);
}