Merge pull request #13719 from taosdata/feature/TD-14481-3.0
refactor: tsma
This commit is contained in:
commit
f0652ba4ff
|
@ -2344,15 +2344,17 @@ typedef struct {
|
||||||
char indexName[TSDB_INDEX_NAME_LEN];
|
char indexName[TSDB_INDEX_NAME_LEN];
|
||||||
int32_t exprLen;
|
int32_t exprLen;
|
||||||
int32_t tagsFilterLen;
|
int32_t tagsFilterLen;
|
||||||
int32_t numOfVgroups;
|
|
||||||
int64_t indexUid;
|
int64_t indexUid;
|
||||||
tb_uid_t tableUid; // super/child/common table uid
|
tb_uid_t tableUid; // super/child/common table uid
|
||||||
|
tb_uid_t dstTbUid; // for dstVgroup
|
||||||
int64_t interval;
|
int64_t interval;
|
||||||
int64_t offset; // use unit by precision of DB
|
int64_t offset; // use unit by precision of DB
|
||||||
int64_t sliding;
|
int64_t sliding;
|
||||||
|
char* dstTbName; // for dstVgroup
|
||||||
char* expr; // sma expression
|
char* expr; // sma expression
|
||||||
char* tagsFilter;
|
char* tagsFilter;
|
||||||
SVgEpSet* pVgEpSet;
|
SSchemaWrapper schemaRow; // for dstVgroup
|
||||||
|
SSchemaWrapper schemaTag; // for dstVgroup
|
||||||
} STSma; // Time-range-wise SMA
|
} STSma; // Time-range-wise SMA
|
||||||
|
|
||||||
typedef STSma SVCreateTSmaReq;
|
typedef STSma SVCreateTSmaReq;
|
||||||
|
@ -2437,27 +2439,6 @@ static int32_t tDecodeTSmaWrapper(SDecoder* pDecoder, STSmaWrapper* pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t indexUid;
|
|
||||||
STimeWindow queryWindow;
|
|
||||||
} SVGetTsmaExpWndsReq;
|
|
||||||
|
|
||||||
#define SMA_WNDS_EXPIRE_FLAG (0x1)
|
|
||||||
#define SMA_WNDS_IS_EXPIRE(flag) (((flag)&SMA_WNDS_EXPIRE_FLAG) != 0)
|
|
||||||
#define SMA_WNDS_SET_EXPIRE(flag) ((flag) |= SMA_WNDS_EXPIRE_FLAG)
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t indexUid;
|
|
||||||
int8_t flags; // 0x1 all window expired
|
|
||||||
int32_t numExpWnds;
|
|
||||||
TSKEY wndSKeys[];
|
|
||||||
} SVGetTsmaExpWndsRsp;
|
|
||||||
|
|
||||||
int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder* pCoder, const SVGetTsmaExpWndsReq* pReq);
|
|
||||||
int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder* pCoder, SVGetTsmaExpWndsReq* pReq);
|
|
||||||
int32_t tEncodeSVGetTSmaExpWndsRsp(SEncoder* pCoder, const SVGetTsmaExpWndsRsp* pReq);
|
|
||||||
int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder* pCoder, SVGetTsmaExpWndsRsp* pReq);
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int idx;
|
int idx;
|
||||||
} SMCreateFullTextReq;
|
} SMCreateFullTextReq;
|
||||||
|
@ -2517,6 +2498,7 @@ typedef struct {
|
||||||
|
|
||||||
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
|
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
|
||||||
int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp);
|
int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp);
|
||||||
|
|
||||||
void tFreeSTableIndexInfo(void* pInfo);
|
void tFreeSTableIndexInfo(void* pInfo);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -190,7 +190,6 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_GET_TSMA_EXP_WNDS, "vnode-get-tsma-expired-windows", SVGetTsmaExpWndsReq, SVGetTsmaExpWndsRsp)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL)
|
||||||
|
|
|
@ -352,9 +352,6 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0619)
|
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0619)
|
||||||
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x061A)
|
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x061A)
|
||||||
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061B)
|
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061B)
|
||||||
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x061C)
|
|
||||||
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x061D)
|
|
||||||
#define TSDB_CODE_TDB_TSMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x061E)
|
|
||||||
|
|
||||||
// query
|
// query
|
||||||
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
|
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
|
||||||
|
@ -685,6 +682,19 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_SML_INVALID_DATA TAOS_DEF_ERROR_CODE(0, 0x3002)
|
#define TSDB_CODE_SML_INVALID_DATA TAOS_DEF_ERROR_CODE(0, 0x3002)
|
||||||
#define TSDB_CODE_SML_INVALID_DB_CONF TAOS_DEF_ERROR_CODE(0, 0x3003)
|
#define TSDB_CODE_SML_INVALID_DB_CONF TAOS_DEF_ERROR_CODE(0, 0x3003)
|
||||||
|
|
||||||
|
//tsma
|
||||||
|
#define TSDB_CODE_TSMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x3100)
|
||||||
|
#define TSDB_CODE_TSMA_NO_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x3101)
|
||||||
|
#define TSDB_CODE_TSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3102)
|
||||||
|
#define TSDB_CODE_TSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3103)
|
||||||
|
#define TSDB_CODE_TSMA_NO_INDEX_IN_CACHE TAOS_DEF_ERROR_CODE(0, 0x3104)
|
||||||
|
#define TSDB_CODE_TSMA_RM_SKEY_IN_HASH TAOS_DEF_ERROR_CODE(0, 0x3105)
|
||||||
|
|
||||||
|
//rsma
|
||||||
|
#define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150)
|
||||||
|
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -1634,6 +1634,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
|
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
|
||||||
|
if (pCol->type == pColInfoData->info.type) {
|
||||||
|
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, var, true, offset,
|
||||||
|
k);
|
||||||
|
} else {
|
||||||
char tv[8] = {0};
|
char tv[8] = {0};
|
||||||
if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
|
if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
|
||||||
float v = 0;
|
float v = 0;
|
||||||
|
@ -1652,7 +1656,9 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
||||||
GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var);
|
GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var);
|
||||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||||
}
|
}
|
||||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, tv, true, offset, k);
|
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, tv, true, offset,
|
||||||
|
k);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
|
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
|
||||||
TASSERT(0);
|
TASSERT(0);
|
||||||
|
|
|
@ -3877,9 +3877,10 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
|
||||||
if (tEncodeCStr(pCoder, pSma->indexName) < 0) return -1;
|
if (tEncodeCStr(pCoder, pSma->indexName) < 0) return -1;
|
||||||
if (tEncodeI32(pCoder, pSma->exprLen) < 0) return -1;
|
if (tEncodeI32(pCoder, pSma->exprLen) < 0) return -1;
|
||||||
if (tEncodeI32(pCoder, pSma->tagsFilterLen) < 0) return -1;
|
if (tEncodeI32(pCoder, pSma->tagsFilterLen) < 0) return -1;
|
||||||
if (tEncodeI32(pCoder, pSma->numOfVgroups) < 0) return -1;
|
|
||||||
if (tEncodeI64(pCoder, pSma->indexUid) < 0) return -1;
|
if (tEncodeI64(pCoder, pSma->indexUid) < 0) return -1;
|
||||||
if (tEncodeI64(pCoder, pSma->tableUid) < 0) return -1;
|
if (tEncodeI64(pCoder, pSma->tableUid) < 0) return -1;
|
||||||
|
if (tEncodeI64(pCoder, pSma->dstTbUid) < 0) return -1;
|
||||||
|
if (tEncodeCStr(pCoder, pSma->dstTbName) < 0) return -1;
|
||||||
if (tEncodeI64(pCoder, pSma->interval) < 0) return -1;
|
if (tEncodeI64(pCoder, pSma->interval) < 0) return -1;
|
||||||
if (tEncodeI64(pCoder, pSma->offset) < 0) return -1;
|
if (tEncodeI64(pCoder, pSma->offset) < 0) return -1;
|
||||||
if (tEncodeI64(pCoder, pSma->sliding) < 0) return -1;
|
if (tEncodeI64(pCoder, pSma->sliding) < 0) return -1;
|
||||||
|
@ -3889,17 +3890,10 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
|
||||||
if (pSma->tagsFilterLen > 0) {
|
if (pSma->tagsFilterLen > 0) {
|
||||||
if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1;
|
if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1;
|
||||||
}
|
}
|
||||||
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
|
|
||||||
if (tEncodeI32(pCoder, pSma->pVgEpSet[v].vgId) < 0) return -1;
|
tEncodeSSchemaWrapper(pCoder, &pSma->schemaRow);
|
||||||
if (tEncodeI8(pCoder, pSma->pVgEpSet[v].epSet.inUse) < 0) return -1;
|
tEncodeSSchemaWrapper(pCoder, &pSma->schemaTag);
|
||||||
int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps;
|
|
||||||
if (tEncodeI8(pCoder, numOfEps) < 0) return -1;
|
|
||||||
for (int32_t n = 0; n < numOfEps; ++n) {
|
|
||||||
const SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n];
|
|
||||||
if (tEncodeCStr(pCoder, pEp->fqdn) < 0) return -1;
|
|
||||||
if (tEncodeU16(pCoder, pEp->port) < 0) return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3907,14 +3901,15 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
|
||||||
if (tDecodeI8(pCoder, &pSma->version) < 0) return -1;
|
if (tDecodeI8(pCoder, &pSma->version) < 0) return -1;
|
||||||
if (tDecodeI8(pCoder, &pSma->intervalUnit) < 0) return -1;
|
if (tDecodeI8(pCoder, &pSma->intervalUnit) < 0) return -1;
|
||||||
if (tDecodeI8(pCoder, &pSma->slidingUnit) < 0) return -1;
|
if (tDecodeI8(pCoder, &pSma->slidingUnit) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pSma->dstVgId) < 0) return -1;
|
|
||||||
if (tDecodeI8(pCoder, &pSma->timezoneInt) < 0) return -1;
|
if (tDecodeI8(pCoder, &pSma->timezoneInt) < 0) return -1;
|
||||||
|
if (tDecodeI32(pCoder, &pSma->dstVgId) < 0) return -1;
|
||||||
if (tDecodeCStrTo(pCoder, pSma->indexName) < 0) return -1;
|
if (tDecodeCStrTo(pCoder, pSma->indexName) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pSma->exprLen) < 0) return -1;
|
if (tDecodeI32(pCoder, &pSma->exprLen) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pSma->tagsFilterLen) < 0) return -1;
|
if (tDecodeI32(pCoder, &pSma->tagsFilterLen) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pSma->numOfVgroups) < 0) return -1;
|
|
||||||
if (tDecodeI64(pCoder, &pSma->indexUid) < 0) return -1;
|
if (tDecodeI64(pCoder, &pSma->indexUid) < 0) return -1;
|
||||||
if (tDecodeI64(pCoder, &pSma->tableUid) < 0) return -1;
|
if (tDecodeI64(pCoder, &pSma->tableUid) < 0) return -1;
|
||||||
|
if (tDecodeI64(pCoder, &pSma->dstTbUid) < 0) return -1;
|
||||||
|
if (tDecodeCStr(pCoder, &pSma->dstTbName) < 0) return -1;
|
||||||
if (tDecodeI64(pCoder, &pSma->interval) < 0) return -1;
|
if (tDecodeI64(pCoder, &pSma->interval) < 0) return -1;
|
||||||
if (tDecodeI64(pCoder, &pSma->offset) < 0) return -1;
|
if (tDecodeI64(pCoder, &pSma->offset) < 0) return -1;
|
||||||
if (tDecodeI64(pCoder, &pSma->sliding) < 0) return -1;
|
if (tDecodeI64(pCoder, &pSma->sliding) < 0) return -1;
|
||||||
|
@ -3928,27 +3923,9 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
|
||||||
} else {
|
} else {
|
||||||
pSma->tagsFilter = NULL;
|
pSma->tagsFilter = NULL;
|
||||||
}
|
}
|
||||||
if (pSma->numOfVgroups > 0) {
|
// only needed in dstVgroup
|
||||||
pSma->pVgEpSet = (SVgEpSet *)tDecoderMalloc(pCoder, pSma->numOfVgroups * sizeof(SVgEpSet));
|
tDecodeSSchemaWrapperEx(pCoder, &pSma->schemaRow);
|
||||||
if (!pSma->pVgEpSet) {
|
tDecodeSSchemaWrapperEx(pCoder, &pSma->schemaTag);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
memset(pSma->pVgEpSet, 0, pSma->numOfVgroups * sizeof(SVgEpSet));
|
|
||||||
|
|
||||||
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
|
|
||||||
if (tDecodeI32(pCoder, &pSma->pVgEpSet[v].vgId) < 0) return -1;
|
|
||||||
if (tDecodeI8(pCoder, &pSma->pVgEpSet[v].epSet.inUse) < 0) return -1;
|
|
||||||
if (tDecodeI8(pCoder, &pSma->pVgEpSet[v].epSet.numOfEps) < 0) return -1;
|
|
||||||
int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps;
|
|
||||||
for (int32_t n = 0; n < numOfEps; ++n) {
|
|
||||||
SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n];
|
|
||||||
if (tDecodeCStrTo(pCoder, pEp->fqdn) < 0) return -1;
|
|
||||||
if (tDecodeU16(pCoder, &pEp->port) < 0) return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -3991,55 +3968,6 @@ int32_t tDecodeSVDropTSmaReq(SDecoder *pCoder, SVDropTSmaReq *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder *pCoder, const SVGetTsmaExpWndsReq *pReq) {
|
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
|
||||||
|
|
||||||
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
|
|
||||||
if (tEncodeI64(pCoder, pReq->queryWindow.skey) < 0) return -1;
|
|
||||||
if (tEncodeI64(pCoder, pReq->queryWindow.ekey) < 0) return -1;
|
|
||||||
|
|
||||||
tEndEncode(pCoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder *pCoder, SVGetTsmaExpWndsReq *pReq) {
|
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
|
||||||
|
|
||||||
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
|
|
||||||
if (tDecodeI64(pCoder, &pReq->queryWindow.skey) < 0) return -1;
|
|
||||||
if (tDecodeI64(pCoder, &pReq->queryWindow.ekey) < 0) return -1;
|
|
||||||
|
|
||||||
tEndDecode(pCoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeSVGetTSmaExpWndsRsp(SEncoder *pCoder, const SVGetTsmaExpWndsRsp *pReq) {
|
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
|
||||||
|
|
||||||
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
|
|
||||||
if (tEncodeI8(pCoder, pReq->flags) < 0) return -1;
|
|
||||||
if (tEncodeI32(pCoder, pReq->numExpWnds) < 0) return -1;
|
|
||||||
for (int32_t i = 0; i < pReq->numExpWnds; ++i) {
|
|
||||||
if (tEncodeI64(pCoder, pReq->wndSKeys[i]) < 0) return -1;
|
|
||||||
}
|
|
||||||
tEndEncode(pCoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder *pCoder, SVGetTsmaExpWndsRsp *pReq) {
|
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
|
||||||
|
|
||||||
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
|
|
||||||
if (tDecodeI8(pCoder, &pReq->flags) < 0) return -1;
|
|
||||||
if (tDecodeI32(pCoder, &pReq->numExpWnds) < 0) return -1;
|
|
||||||
for (int32_t i = 0; i < pReq->numExpWnds; ++i) {
|
|
||||||
if (tDecodeI64(pCoder, &pReq->wndSKeys[i]) < 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndDecode(pCoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
|
int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
|
||||||
int32_t headLen = sizeof(SMsgHead);
|
int32_t headLen = sizeof(SMsgHead);
|
||||||
if (buf != NULL) {
|
if (buf != NULL) {
|
||||||
|
|
|
@ -301,15 +301,16 @@ typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
char stb[TSDB_TABLE_FNAME_LEN];
|
char stb[TSDB_TABLE_FNAME_LEN];
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
|
char dstTbName[TSDB_TABLE_FNAME_LEN];
|
||||||
int64_t createdTime;
|
int64_t createdTime;
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
int64_t stbUid;
|
int64_t stbUid;
|
||||||
int64_t dbUid;
|
int64_t dbUid;
|
||||||
|
int64_t dstTbUid;
|
||||||
int8_t intervalUnit;
|
int8_t intervalUnit;
|
||||||
int8_t slidingUnit;
|
int8_t slidingUnit;
|
||||||
int8_t timezone;
|
int8_t timezone;
|
||||||
int32_t dstVgId; // for stream
|
int32_t dstVgId; // for stream
|
||||||
int64_t dstTbUid;
|
|
||||||
int64_t interval;
|
int64_t interval;
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
int64_t sliding;
|
int64_t sliding;
|
||||||
|
@ -317,12 +318,12 @@ typedef struct {
|
||||||
int32_t tagsFilterLen;
|
int32_t tagsFilterLen;
|
||||||
int32_t sqlLen;
|
int32_t sqlLen;
|
||||||
int32_t astLen;
|
int32_t astLen;
|
||||||
int32_t numOfVgroups;
|
|
||||||
char* expr;
|
char* expr;
|
||||||
char* tagsFilter;
|
char* tagsFilter;
|
||||||
char* sql;
|
char* sql;
|
||||||
char* ast;
|
char* ast;
|
||||||
SVgEpSet* pVgEpSet;
|
SSchemaWrapper schemaRow; // for dstVgroup
|
||||||
|
SSchemaWrapper schemaTag; // for dstVgroup
|
||||||
} SSmaObj;
|
} SSmaObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
#include "mndVgroup.h"
|
#include "mndVgroup.h"
|
||||||
|
#include "parser.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
|
||||||
#define TSDB_SMA_VER_NUMBER 1
|
#define TSDB_SMA_VER_NUMBER 1
|
||||||
|
@ -82,10 +83,12 @@ static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) {
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
|
||||||
|
SDB_SET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_DB_FNAME_LEN, _OVER)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pSma->createdTime, _OVER)
|
SDB_SET_INT64(pRaw, dataPos, pSma->createdTime, _OVER)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pSma->uid, _OVER)
|
SDB_SET_INT64(pRaw, dataPos, pSma->uid, _OVER)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pSma->stbUid, _OVER)
|
SDB_SET_INT64(pRaw, dataPos, pSma->stbUid, _OVER)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pSma->dbUid, _OVER)
|
SDB_SET_INT64(pRaw, dataPos, pSma->dbUid, _OVER)
|
||||||
|
SDB_SET_INT64(pRaw, dataPos, pSma->dstTbUid, _OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pSma->intervalUnit, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pSma->intervalUnit, _OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pSma->slidingUnit, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pSma->slidingUnit, _OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pSma->timezone, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pSma->timezone, _OVER)
|
||||||
|
@ -147,10 +150,12 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
|
||||||
|
SDB_GET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_DB_FNAME_LEN, _OVER)
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pSma->createdTime, _OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pSma->createdTime, _OVER)
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pSma->uid, _OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pSma->uid, _OVER)
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pSma->stbUid, _OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pSma->stbUid, _OVER)
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pSma->dbUid, _OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pSma->dbUid, _OVER)
|
||||||
|
SDB_GET_INT64(pRaw, dataPos, &pSma->dstTbUid, _OVER)
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pSma->intervalUnit, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &pSma->intervalUnit, _OVER)
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pSma->slidingUnit, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &pSma->slidingUnit, _OVER)
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pSma->timezone, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &pSma->timezone, _OVER)
|
||||||
|
@ -260,13 +265,16 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm
|
||||||
req.tagsFilterLen = pSma->tagsFilterLen;
|
req.tagsFilterLen = pSma->tagsFilterLen;
|
||||||
req.indexUid = pSma->uid;
|
req.indexUid = pSma->uid;
|
||||||
req.tableUid = pSma->stbUid;
|
req.tableUid = pSma->stbUid;
|
||||||
|
req.dstVgId = pSma->dstVgId;
|
||||||
|
req.dstTbUid = pSma->dstTbUid;
|
||||||
req.interval = pSma->interval;
|
req.interval = pSma->interval;
|
||||||
req.offset = pSma->offset;
|
req.offset = pSma->offset;
|
||||||
req.sliding = pSma->sliding;
|
req.sliding = pSma->sliding;
|
||||||
req.expr = pSma->expr;
|
req.expr = pSma->expr;
|
||||||
req.tagsFilter = pSma->tagsFilter;
|
req.tagsFilter = pSma->tagsFilter;
|
||||||
req.numOfVgroups = pSma->numOfVgroups;
|
req.schemaRow = pSma->schemaRow;
|
||||||
req.pVgEpSet = pSma->pVgEpSet;
|
req.schemaTag = pSma->schemaTag;
|
||||||
|
req.dstTbName = pSma->dstTbName;
|
||||||
|
|
||||||
// get length
|
// get length
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
@ -425,14 +433,30 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
// todo add sma info here
|
// todo add sma info here
|
||||||
SVgEpSet *pVgEpSet = NULL;
|
SNode *pAst = NULL;
|
||||||
int32_t numOfVgroups = 0;
|
if (nodesStringToNode(pSma->ast, &pAst) < 0) {
|
||||||
if (mndSmaGetVgEpSet(pMnode, pDb, &pVgEpSet, &numOfVgroups) != 0) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
if (qExtractResultSchema(pAst, &pSma->schemaRow.nCols, &pSma->schemaRow.pSchema) != 0) {
|
||||||
|
nodesDestroyNode(pAst);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
nodesDestroyNode(pAst);
|
||||||
|
pSma->schemaRow.version = 1;
|
||||||
|
|
||||||
|
// TODO: the schemaTag generated by qExtractResultXXX later.
|
||||||
|
pSma->schemaTag.nCols = 1;
|
||||||
|
pSma->schemaTag.version = 1;
|
||||||
|
pSma->schemaTag.pSchema = taosMemoryCalloc(1, sizeof(SSchema));
|
||||||
|
if (!pSma->schemaTag.pSchema) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pSma->schemaTag.pSchema[0].type = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
pSma->schemaTag.pSchema[0].bytes = TYPE_BYTES[TSDB_DATA_TYPE_BIGINT];
|
||||||
|
pSma->schemaTag.pSchema[0].colId = pSma->schemaRow.nCols + PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||||
|
pSma->schemaTag.pSchema[0].flags = 0;
|
||||||
|
snprintf(pSma->schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId");
|
||||||
|
|
||||||
pSma->pVgEpSet = pVgEpSet;
|
|
||||||
pSma->numOfVgroups = numOfVgroups;
|
|
||||||
|
|
||||||
int32_t smaContLen = 0;
|
int32_t smaContLen = 0;
|
||||||
void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
|
void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
|
||||||
|
@ -464,12 +488,15 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
smaObj.createdTime = taosGetTimestampMs();
|
smaObj.createdTime = taosGetTimestampMs();
|
||||||
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||||
|
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
|
||||||
|
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "td.tsma.rst.tb.%s", pCreate->name);
|
||||||
|
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
|
||||||
|
smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
|
||||||
smaObj.stbUid = pStb->uid;
|
smaObj.stbUid = pStb->uid;
|
||||||
smaObj.dbUid = pStb->dbUid;
|
smaObj.dbUid = pStb->dbUid;
|
||||||
smaObj.intervalUnit = pCreate->intervalUnit;
|
smaObj.intervalUnit = pCreate->intervalUnit;
|
||||||
smaObj.slidingUnit = pCreate->slidingUnit;
|
smaObj.slidingUnit = pCreate->slidingUnit;
|
||||||
smaObj.timezone = pCreate->timezone;
|
smaObj.timezone = pCreate->timezone;
|
||||||
smaObj.dstVgId = pCreate->dstVgId;
|
|
||||||
smaObj.interval = pCreate->interval;
|
smaObj.interval = pCreate->interval;
|
||||||
smaObj.offset = pCreate->offset;
|
smaObj.offset = pCreate->offset;
|
||||||
smaObj.sliding = pCreate->sliding;
|
smaObj.sliding = pCreate->sliding;
|
||||||
|
@ -1088,52 +1115,3 @@ static void mndCancelGetNextSma(SMnode *pMnode, void *pIter) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups) {
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
SVgObj *pVgroup = NULL;
|
|
||||||
void *pIter = NULL;
|
|
||||||
SVgEpSet *pVgEpSet = NULL;
|
|
||||||
int32_t nAllocVgs = 16;
|
|
||||||
int32_t nVgs = 0;
|
|
||||||
|
|
||||||
pVgEpSet = taosMemoryCalloc(nAllocVgs, sizeof(SVgEpSet));
|
|
||||||
if (!pVgEpSet) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
|
||||||
if (pIter == NULL) break;
|
|
||||||
if (pVgroup->dbUid != pDb->uid) {
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nVgs >= nAllocVgs) {
|
|
||||||
void *p = taosMemoryRealloc(pVgEpSet, nAllocVgs * 2 * sizeof(SVgEpSet));
|
|
||||||
if (!p) {
|
|
||||||
taosMemoryFree(pVgEpSet);
|
|
||||||
sdbCancelFetch(pSdb, pIter);
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
pVgEpSet = (SVgEpSet *)p;
|
|
||||||
nAllocVgs *= 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
(pVgEpSet + nVgs)->vgId = pVgroup->vgId;
|
|
||||||
(pVgEpSet + nVgs)->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
|
||||||
|
|
||||||
++nVgs;
|
|
||||||
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
}
|
|
||||||
|
|
||||||
*ppVgEpSet = pVgEpSet;
|
|
||||||
*numOfVgroups = nVgs;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
|
@ -252,8 +252,12 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
|
||||||
}
|
}
|
||||||
|
|
||||||
if (qExtractResultSchema(pAst, (int32_t *)&pStream->outputSchema.nCols, &pStream->outputSchema.pSchema) != 0) {
|
if (qExtractResultSchema(pAst, (int32_t *)&pStream->outputSchema.nCols, &pStream->outputSchema.pSchema) != 0) {
|
||||||
|
nodesDestroyNode(pAst);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
// free
|
||||||
|
nodesDestroyNode(pAst);
|
||||||
|
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
printf("|");
|
printf("|");
|
||||||
|
|
|
@ -28,7 +28,6 @@ target_sources(
|
||||||
|
|
||||||
# sma
|
# sma
|
||||||
"src/sma/sma.c"
|
"src/sma/sma.c"
|
||||||
"src/sma/smaTDBImpl.c"
|
|
||||||
"src/sma/smaEnv.c"
|
"src/sma/smaEnv.c"
|
||||||
"src/sma/smaOpen.c"
|
"src/sma/smaOpen.c"
|
||||||
"src/sma/smaRollup.c"
|
"src/sma/smaRollup.c"
|
||||||
|
|
|
@ -43,34 +43,16 @@ typedef struct SRSmaInfo SRSmaInfo;
|
||||||
struct SSmaEnv {
|
struct SSmaEnv {
|
||||||
TdThreadRwlock lock;
|
TdThreadRwlock lock;
|
||||||
int8_t type;
|
int8_t type;
|
||||||
TXN txn;
|
|
||||||
void *pPool; // SPoolMem
|
|
||||||
SDiskID did;
|
|
||||||
TDB *dbEnv; // TODO: If it's better to put it in smaIndex level?
|
|
||||||
char *path; // relative path
|
|
||||||
SSmaStat *pStat;
|
SSmaStat *pStat;
|
||||||
};
|
};
|
||||||
|
|
||||||
#define SMA_ENV_LOCK(env) ((env)->lock)
|
#define SMA_ENV_LOCK(env) ((env)->lock)
|
||||||
#define SMA_ENV_TYPE(env) ((env)->type)
|
#define SMA_ENV_TYPE(env) ((env)->type)
|
||||||
#define SMA_ENV_DID(env) ((env)->did)
|
|
||||||
#define SMA_ENV_ENV(env) ((env)->dbEnv)
|
|
||||||
#define SMA_ENV_PATH(env) ((env)->path)
|
|
||||||
#define SMA_ENV_STAT(env) ((env)->pStat)
|
#define SMA_ENV_STAT(env) ((env)->pStat)
|
||||||
#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
|
#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
|
||||||
|
|
||||||
struct SSmaStatItem {
|
struct SSmaStatItem {
|
||||||
/**
|
|
||||||
* @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service.
|
|
||||||
* - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from
|
|
||||||
* Streaming Module or TSDB local persistence.
|
|
||||||
* - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
|
|
||||||
* without information about its previous state.
|
|
||||||
* - TSDB_SMA_STAT_DROPPED: 1)sma dropped
|
|
||||||
* N.B. only applicable to tsma
|
|
||||||
*/
|
|
||||||
int8_t state; // ETsdbSmaStat
|
int8_t state; // ETsdbSmaStat
|
||||||
SHashObj *expiredWindows; // key: skey of time window, value: version
|
|
||||||
STSma *pTSma; // cache schema
|
STSma *pTSma; // cache schema
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -84,29 +66,6 @@ struct SSmaStat {
|
||||||
#define SMA_STAT_ITEMS(s) ((s)->smaStatItems)
|
#define SMA_STAT_ITEMS(s) ((s)->smaStatItems)
|
||||||
#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash)
|
#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash)
|
||||||
|
|
||||||
struct SSmaKey {
|
|
||||||
TSKEY skey;
|
|
||||||
int64_t groupId;
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef struct SDBFile SDBFile;
|
|
||||||
|
|
||||||
struct SDBFile {
|
|
||||||
int32_t fid;
|
|
||||||
TTB *pDB;
|
|
||||||
char *path;
|
|
||||||
};
|
|
||||||
|
|
||||||
int32_t tdSmaBeginCommit(SSmaEnv *pEnv);
|
|
||||||
int32_t tdSmaEndCommit(SSmaEnv *pEnv);
|
|
||||||
|
|
||||||
int32_t smaOpenDBEnv(TDB **ppEnv, const char *path);
|
|
||||||
int32_t smaCloseDBEnv(TDB *pEnv);
|
|
||||||
int32_t smaOpenDBF(TDB *pEnv, SDBFile *pDBF);
|
|
||||||
int32_t smaCloseDBF(SDBFile *pDBF);
|
|
||||||
int32_t smaSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn);
|
|
||||||
void *smaGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_t *valLen);
|
|
||||||
|
|
||||||
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
|
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
|
||||||
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
|
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -114,13 +73,6 @@ int32_t tbGetTSmaStatus(SSma *pSma, STSma *param, void *result);
|
||||||
int32_t tbRemoveTSmaData(SSma *pSma, STSma *param, STimeWindow *pWin);
|
int32_t tbRemoveTSmaData(SSma *pSma, STSma *param, STimeWindow *pWin);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tdEncodeTSmaKey(int64_t groupId, TSKEY tsKey, void **pData) {
|
|
||||||
int32_t len = 0;
|
|
||||||
len += taosEncodeFixedI64(pData, tsKey);
|
|
||||||
len += taosEncodeFixedI64(pData, groupId);
|
|
||||||
return len;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tdInitSma(SSma *pSma);
|
int32_t tdInitSma(SSma *pSma);
|
||||||
int32_t tdDropTSma(SSma *pSma, char *pMsg);
|
int32_t tdDropTSma(SSma *pSma, char *pMsg);
|
||||||
int32_t tdDropTSmaData(SSma *pSma, int64_t indexUid);
|
int32_t tdDropTSmaData(SSma *pSma, int64_t indexUid);
|
||||||
|
@ -128,13 +80,11 @@ int32_t tdInsertRSmaData(SSma *pSma, char *msg);
|
||||||
|
|
||||||
int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat);
|
int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat);
|
||||||
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
|
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
|
||||||
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
|
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType, bool onlyCheck);
|
||||||
|
|
||||||
int32_t tdLockSma(SSma *pSma);
|
int32_t tdLockSma(SSma *pSma);
|
||||||
int32_t tdUnLockSma(SSma *pSma);
|
int32_t tdUnLockSma(SSma *pSma);
|
||||||
|
|
||||||
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
|
|
||||||
|
|
||||||
static FORCE_INLINE int16_t tdTSmaAdd(SSma *pSma, int16_t n) { return atomic_add_fetch_16(&SMA_TSMA_NUM(pSma), n); }
|
static FORCE_INLINE int16_t tdTSmaAdd(SSma *pSma, int16_t n) { return atomic_add_fetch_16(&SMA_TSMA_NUM(pSma), n); }
|
||||||
static FORCE_INLINE int16_t tdTSmaSub(SSma *pSma, int16_t n) { return atomic_sub_fetch_16(&SMA_TSMA_NUM(pSma), n); }
|
static FORCE_INLINE int16_t tdTSmaSub(SSma *pSma, int16_t n) { return atomic_sub_fetch_16(&SMA_TSMA_NUM(pSma), n); }
|
||||||
|
|
||||||
|
@ -219,11 +169,8 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SDisk
|
||||||
void *tdFreeRSmaInfo(SRSmaInfo *pInfo);
|
void *tdFreeRSmaInfo(SRSmaInfo *pInfo);
|
||||||
|
|
||||||
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
|
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
|
||||||
int32_t tdUpdateExpiredWindowImpl(SSma *pSma, const SSubmitReq *pMsg, int64_t version);
|
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
|
||||||
// TODO: This is the basic params, and should wrap the params to a queryHandle.
|
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
||||||
int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult);
|
|
||||||
|
|
||||||
int32_t tdGetTSmaDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -150,7 +150,6 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t smaOpen(SVnode* pVnode);
|
int32_t smaOpen(SVnode* pVnode);
|
||||||
int32_t smaClose(SSma* pSma);
|
int32_t smaClose(SSma* pSma);
|
||||||
|
|
||||||
int32_t tdUpdateExpireWindow(SSma* pSma, const SSubmitReq* pMsg, int64_t version);
|
|
||||||
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
||||||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||||
|
|
||||||
|
@ -227,7 +226,7 @@ struct SVnode {
|
||||||
SQHandle* pQuery;
|
SQHandle* pQuery;
|
||||||
};
|
};
|
||||||
|
|
||||||
#define TD_VID(PVNODE) (PVNODE)->config.vgId
|
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
|
||||||
|
|
||||||
#define VND_TSDB(vnd) ((vnd)->pTsdb)
|
#define VND_TSDB(vnd) ((vnd)->pTsdb)
|
||||||
#define VND_RSMA0(vnd) ((vnd)->pTsdb)
|
#define VND_RSMA0(vnd) ((vnd)->pTsdb)
|
||||||
|
|
|
@ -34,13 +34,13 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
|
|
||||||
// validate req
|
// validate req
|
||||||
|
// save smaIndex
|
||||||
metaReaderInit(&mr, pMeta, 0);
|
metaReaderInit(&mr, pMeta, 0);
|
||||||
if (metaGetTableEntryByUid(&mr, pCfg->indexUid) == 0) {
|
if (metaGetTableEntryByUid(&mr, pCfg->indexUid) == 0) {
|
||||||
// TODO: just for pass case
|
|
||||||
#if 1
|
#if 1
|
||||||
terrno = TSDB_CODE_TDB_TSMA_ALREADY_EXIST;
|
terrno = TSDB_CODE_TSMA_ALREADY_EXIST;
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
return -1;
|
return -1; // don't goto _err;
|
||||||
#else
|
#else
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -36,25 +36,9 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tdUpdateExpireWindow(SSma* pSma, const SSubmitReq* pMsg, int64_t version) {
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
if ((code = tdUpdateExpiredWindowImpl(pSma, pMsg, version)) < 0) {
|
|
||||||
smaWarn("vgId:%d, update expired sma window failed since %s", SMA_VID(pSma), tstrerror(terrno));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tdGetTSmaData(SSma* pSma, char* pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) {
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
if ((code = tdGetTSmaDataImpl(pSma, pData, indexUid, querySKey, nMaxResult)) < 0) {
|
|
||||||
smaWarn("vgId:%d, get tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t smaGetTSmaDays(SVnodeCfg* pCfg, void* pCont, uint32_t contLen, int32_t* days) {
|
int32_t smaGetTSmaDays(SVnodeCfg* pCfg, void* pCont, uint32_t contLen, int32_t* days) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if ((code = tdGetTSmaDaysImpl(pCfg, pCont, contLen, days)) < 0) {
|
if ((code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days)) < 0) {
|
||||||
smaWarn("vgId:%d, get tsma days failed since %s", pCfg->vgId, tstrerror(terrno));
|
smaWarn("vgId:%d, get tsma days failed since %s", pCfg->vgId, tstrerror(terrno));
|
||||||
}
|
}
|
||||||
smaDebug("vgId:%d, get tsma days %d", pCfg->vgId, *days);
|
smaDebug("vgId:%d, get tsma days %d", pCfg->vgId, *days);
|
||||||
|
|
|
@ -151,31 +151,11 @@ static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path,
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(path && (strlen(path) > 0));
|
|
||||||
SMA_ENV_PATH(pEnv) = strdup(path);
|
|
||||||
if (!SMA_ENV_PATH(pEnv)) {
|
|
||||||
tdFreeSmaEnv(pEnv);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SMA_ENV_DID(pEnv) = did;
|
|
||||||
|
|
||||||
if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType) != TSDB_CODE_SUCCESS) {
|
if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType) != TSDB_CODE_SUCCESS) {
|
||||||
tdFreeSmaEnv(pEnv);
|
tdFreeSmaEnv(pEnv);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
char aname[TSDB_FILENAME_LEN] = {0};
|
|
||||||
tfsAbsoluteName(SMA_TFS(pSma), did, path, aname);
|
|
||||||
if (smaOpenDBEnv(&pEnv->dbEnv, aname) != TSDB_CODE_SUCCESS) {
|
|
||||||
tdFreeSmaEnv(pEnv);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!(pEnv->pPool = openPool())) {
|
|
||||||
tdFreeSmaEnv(pEnv);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pEnv;
|
return pEnv;
|
||||||
}
|
}
|
||||||
|
@ -205,10 +185,7 @@ void tdDestroySmaEnv(SSmaEnv *pSmaEnv) {
|
||||||
if (pSmaEnv) {
|
if (pSmaEnv) {
|
||||||
tdDestroySmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv));
|
tdDestroySmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv));
|
||||||
taosMemoryFreeClear(pSmaEnv->pStat);
|
taosMemoryFreeClear(pSmaEnv->pStat);
|
||||||
taosMemoryFreeClear(pSmaEnv->path);
|
|
||||||
taosThreadRwlockDestroy(&(pSmaEnv->lock));
|
taosThreadRwlockDestroy(&(pSmaEnv->lock));
|
||||||
smaCloseDBEnv(pSmaEnv->dbEnv);
|
|
||||||
closePool(pSmaEnv->pPool);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,7 +219,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 1. Lazy mode utilized when init SSmaStat to update expired window(or hungry mode when tdNew).
|
* 1. Lazy mode utilized when init SSmaStat to update expire window(or hungry mode when tdNew).
|
||||||
* 2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
|
* 2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
|
||||||
* tdInitSmaStat invoked in other multithread environment later.
|
* tdInitSmaStat invoked in other multithread environment later.
|
||||||
*/
|
*/
|
||||||
|
@ -280,7 +257,6 @@ void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
|
||||||
if (pSmaStatItem) {
|
if (pSmaStatItem) {
|
||||||
tDestroyTSma(pSmaStatItem->pTSma);
|
tDestroyTSma(pSmaStatItem->pTSma);
|
||||||
taosMemoryFreeClear(pSmaStatItem->pTSma);
|
taosMemoryFreeClear(pSmaStatItem->pTSma);
|
||||||
taosHashCleanup(pSmaStatItem->expiredWindows);
|
|
||||||
taosMemoryFreeClear(pSmaStatItem);
|
taosMemoryFreeClear(pSmaStatItem);
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -341,7 +317,7 @@ int32_t tdUnLockSma(SSma *pSma) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
|
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType, bool onlyCheck) {
|
||||||
SSmaEnv *pEnv = NULL;
|
SSmaEnv *pEnv = NULL;
|
||||||
|
|
||||||
// return if already init
|
// return if already init
|
||||||
|
@ -399,63 +375,3 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t tdSmaBeginCommit(SSmaEnv *pEnv) {
|
|
||||||
TXN *pTxn = &pEnv->txn;
|
|
||||||
// start a new txn
|
|
||||||
tdbTxnOpen(pTxn, 0, poolMalloc, poolFree, pEnv->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
|
||||||
if (tdbBegin(pEnv->dbEnv, pTxn) != 0) {
|
|
||||||
smaWarn("tdSma tdb begin commit fail");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tdSmaEndCommit(SSmaEnv *pEnv) {
|
|
||||||
TXN *pTxn = &pEnv->txn;
|
|
||||||
|
|
||||||
// Commit current txn
|
|
||||||
if (tdbCommit(pEnv->dbEnv, pTxn) != 0) {
|
|
||||||
smaWarn("tdSma tdb end commit fail");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
tdbTxnClose(pTxn);
|
|
||||||
clearPool(pEnv->pPool);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
/**
|
|
||||||
* @brief Get the start TS key of the last data block of one interval/sliding.
|
|
||||||
*
|
|
||||||
* @param pSma
|
|
||||||
* @param param
|
|
||||||
* @param result
|
|
||||||
* @return int32_t
|
|
||||||
* 1) Return 0 and fill the result if the check procedure is normal;
|
|
||||||
* 2) Return -1 if error occurs during the check procedure.
|
|
||||||
*/
|
|
||||||
int32_t tdGetTSmaStatus(SSma *pSma, void *smaIndex, void *result) {
|
|
||||||
const char *procedure = "";
|
|
||||||
if (strncmp(procedure, "get the start TS key of the last data block", 100) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
// fill the result
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Remove the tSma data files related to param between pWin.
|
|
||||||
*
|
|
||||||
* @param pSma
|
|
||||||
* @param param
|
|
||||||
* @param pWin
|
|
||||||
* @return int32_t
|
|
||||||
*/
|
|
||||||
int32_t tdRemoveTSmaData(SSma *pSma, void *smaIndex, STimeWindow *pWin) {
|
|
||||||
// for ("tSmaFiles of param-interval-sliding between pWin") {
|
|
||||||
// // remove the tSmaFile
|
|
||||||
// }
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
|
@ -65,7 +65,7 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA
|
||||||
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t));
|
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t));
|
||||||
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
|
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
|
||||||
smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
|
smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
|
||||||
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
|
terrno = TSDB_CODE_RSMA_INVALID_STAT;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -132,7 +132,7 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
|
||||||
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
||||||
SHashObj *infoHash = NULL;
|
SHashObj *infoHash = NULL;
|
||||||
if (!pStat || !(infoHash = SMA_STAT_INFO_HASH(pStat))) {
|
if (!pStat || !(infoHash = SMA_STAT_INFO_HASH(pStat))) {
|
||||||
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
|
terrno = TSDB_CODE_RSMA_INVALID_STAT;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,13 +167,13 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
|
||||||
*/
|
*/
|
||||||
int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
|
int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
|
||||||
SSma *pSma = pVnode->pSma;
|
SSma *pSma = pVnode->pSma;
|
||||||
SMeta *pMeta = pVnode->pMeta;
|
|
||||||
SMsgCb *pMsgCb = &pVnode->msgCb;
|
|
||||||
if (!pReq->rollup) {
|
if (!pReq->rollup) {
|
||||||
smaTrace("vgId:%d, return directly since no rollup for stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
|
smaTrace("vgId:%d, return directly since no rollup for stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SMeta *pMeta = pVnode->pMeta;
|
||||||
|
SMsgCb *pMsgCb = &pVnode->msgCb;
|
||||||
SRSmaParam *param = &pReq->pRSmaParam;
|
SRSmaParam *param = &pReq->pRSmaParam;
|
||||||
|
|
||||||
if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) {
|
if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) {
|
||||||
|
@ -181,7 +181,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) {
|
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP, false) != TSDB_CODE_SUCCESS) {
|
||||||
terrno = TSDB_CODE_TDB_INIT_FAILED;
|
terrno = TSDB_CODE_TDB_INIT_FAILED;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,130 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#define ALLOW_FORBID_FUNC
|
|
||||||
|
|
||||||
#include "sma.h"
|
|
||||||
|
|
||||||
int32_t smaOpenDBEnv(TDB **ppEnv, const char *path) {
|
|
||||||
int ret = 0;
|
|
||||||
|
|
||||||
if (path == NULL) return -1;
|
|
||||||
|
|
||||||
ret = tdbOpen(path, 4096, 256, ppEnv); // use as param
|
|
||||||
|
|
||||||
if (ret != 0) {
|
|
||||||
smaError("failed to create tsdb db env, ret = %d", ret);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t smaCloseDBEnv(TDB *pEnv) { return tdbClose(pEnv); }
|
|
||||||
|
|
||||||
static inline int tdSmaKeyCmpr(const void *arg1, int len1, const void *arg2, int len2) {
|
|
||||||
const SSmaKey *pKey1 = (const SSmaKey *)arg1;
|
|
||||||
const SSmaKey *pKey2 = (const SSmaKey *)arg2;
|
|
||||||
|
|
||||||
ASSERT(len1 == len2 && len1 == sizeof(SSmaKey));
|
|
||||||
|
|
||||||
if (pKey1->skey < pKey2->skey) {
|
|
||||||
return -1;
|
|
||||||
} else if (pKey1->skey > pKey2->skey) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
if (pKey1->groupId < pKey2->groupId) {
|
|
||||||
return -1;
|
|
||||||
} else if (pKey1->groupId > pKey2->groupId) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t smaOpenDBDb(TTB **ppDB, TDB *pEnv, const char *pFName) {
|
|
||||||
tdb_cmpr_fn_t compFunc;
|
|
||||||
|
|
||||||
// Create a database
|
|
||||||
compFunc = tdSmaKeyCmpr;
|
|
||||||
if (tdbTbOpen(pFName, -1, -1, compFunc, pEnv, ppDB) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t smaCloseDBDb(TTB *pDB) { return tdbTbClose(pDB); }
|
|
||||||
|
|
||||||
int32_t smaOpenDBF(TDB *pEnv, SDBFile *pDBF) {
|
|
||||||
// TEnv is shared by a group of SDBFile
|
|
||||||
if (!pEnv || !pDBF) {
|
|
||||||
terrno = TSDB_CODE_INVALID_PTR;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Open DBF
|
|
||||||
if (smaOpenDBDb(&(pDBF->pDB), pEnv, pDBF->path) < 0) {
|
|
||||||
smaError("failed to open DBF: %s", pDBF->path);
|
|
||||||
smaCloseDBDb(pDBF->pDB);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t smaCloseDBF(SDBFile *pDBF) {
|
|
||||||
int32_t ret = 0;
|
|
||||||
if (pDBF->pDB) {
|
|
||||||
ret = smaCloseDBDb(pDBF->pDB);
|
|
||||||
pDBF->pDB = NULL;
|
|
||||||
}
|
|
||||||
taosMemoryFreeClear(pDBF->path);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t smaSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) {
|
|
||||||
int32_t ret;
|
|
||||||
|
|
||||||
printf("save tsma data into %s, keyLen:%d valLen:%d txn:%p\n", pDBF->path, keyLen, valLen, txn);
|
|
||||||
ret = tdbTbUpsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
|
|
||||||
if (ret < 0) {
|
|
||||||
smaError("failed to upsert tsma data into db, ret = %d", ret);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *smaGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_t *valLen) {
|
|
||||||
void *pVal = NULL;
|
|
||||||
int ret;
|
|
||||||
|
|
||||||
ret = tdbTbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
|
|
||||||
|
|
||||||
if (ret < 0) {
|
|
||||||
smaError("failed to get tsma data from db, ret = %d", ret);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(*valLen >= 0);
|
|
||||||
|
|
||||||
// TODO: lock?
|
|
||||||
// TODO: Would the key/value be destoryed during return the data?
|
|
||||||
// TODO: How about the key is updated while value length is changed? The original value buffer would be freed
|
|
||||||
// automatically?
|
|
||||||
|
|
||||||
return pVal;
|
|
||||||
}
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -238,9 +238,6 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0;
|
if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0;
|
||||||
|
|
||||||
if (tdUpdateExpireWindow(pTq->pVnode->pSma, msg, ver) != 0) {
|
|
||||||
// TODO handle sma error
|
|
||||||
}
|
|
||||||
void* data = taosMemoryMalloc(msgLen);
|
void* data = taosMemoryMalloc(msgLen);
|
||||||
if (data == NULL) {
|
if (data == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -150,7 +150,7 @@ int32_t tsdbCommit(STsdb *pTsdb) {
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -176,13 +176,13 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
||||||
|
|
||||||
pMemTable->nDelOp++;
|
pMemTable->nDelOp++;
|
||||||
|
|
||||||
tsdbError("vgId:%d delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
||||||
" since %s",
|
" since %s",
|
||||||
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
|
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
||||||
" since %s",
|
" since %s",
|
||||||
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
|
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -225,6 +225,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
vTrace("message in fetch queue is processing");
|
vTrace("message in fetch queue is processing");
|
||||||
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_VND_FETCH:
|
case TDMT_VND_FETCH:
|
||||||
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||||
|
@ -236,13 +237,10 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||||
case TDMT_VND_QUERY_HEARTBEAT:
|
case TDMT_VND_QUERY_HEARTBEAT:
|
||||||
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||||
|
|
||||||
case TDMT_VND_TABLE_META:
|
case TDMT_VND_TABLE_META:
|
||||||
return vnodeGetTableMeta(pVnode, pMsg);
|
return vnodeGetTableMeta(pVnode, pMsg);
|
||||||
|
|
||||||
case TDMT_VND_CONSUME:
|
case TDMT_VND_CONSUME:
|
||||||
return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
|
return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
|
||||||
|
|
||||||
case TDMT_STREAM_TASK_RUN:
|
case TDMT_STREAM_TASK_RUN:
|
||||||
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
|
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_TASK_DISPATCH:
|
case TDMT_STREAM_TASK_DISPATCH:
|
||||||
|
@ -279,7 +277,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRp
|
||||||
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||||
// TODO
|
// TODO
|
||||||
|
|
||||||
// blockDebugShowData(data);
|
// blockDebugShowData(data, __func__);
|
||||||
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -373,7 +373,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
||||||
pTsdb->pTfs = tfsOpen(&pDisks, numOfDisks);
|
pTsdb->pTfs = tfsOpen(&pDisks, numOfDisks);
|
||||||
EXPECT_NE(pTsdb->pTfs, nullptr);
|
EXPECT_NE(pTsdb->pTfs, nullptr);
|
||||||
|
|
||||||
// generate SSubmitReq msg and update expired window
|
// generate SSubmitReq msg and update expire window
|
||||||
int16_t schemaVer = 0;
|
int16_t schemaVer = 0;
|
||||||
uint32_t mockRowLen = sizeof(STSRow);
|
uint32_t mockRowLen = sizeof(STSRow);
|
||||||
uint32_t mockRowNum = 2;
|
uint32_t mockRowNum = 2;
|
||||||
|
|
|
@ -353,9 +353,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag valu
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECREATED, "Table re-created")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECREATED, "Table re-created")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta")
|
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_SMA_STAT, "Invalid sma state")
|
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TSMA_ALREADY_EXIST, "TSMA already exists")
|
|
||||||
|
|
||||||
|
|
||||||
// query
|
// query
|
||||||
|
@ -557,6 +554,19 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PRECISION_TYPE, "Invalid timestamp pre
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_DATA, "Invalid data type")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_DATA, "Invalid data type")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_DB_CONF, "Invalid schemaless db config")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_DB_CONF, "Invalid schemaless db config")
|
||||||
|
|
||||||
|
//tsma
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_ALREADY_EXIST, "Tsma already exists")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_META, "No tsma index in meta")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_ENV, "Invalid tsma env")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_STAT, "Invalid tsma state")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in cache")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_RM_SKEY_IN_HASH, "Rm tsma skey in cache")
|
||||||
|
|
||||||
|
//rsma
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
|
||||||
|
|
||||||
|
|
||||||
#ifdef TAOS_ERROR_C
|
#ifdef TAOS_ERROR_C
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
|
|
Loading…
Reference in New Issue