feat: create tsma result stable
This commit is contained in:
parent
328cd92094
commit
d4c33fba2f
|
@ -98,6 +98,7 @@ extern char *qtypeStr[];
|
|||
#undef TD_DEBUG_PRINT_ROW
|
||||
#undef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
|
||||
#undef TD_DEBUG_PRINT_TAG
|
||||
#define TD_DEBUG_SMA_ID 123456
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -2329,24 +2329,28 @@ typedef struct {
|
|||
} SVgEpSet;
|
||||
|
||||
typedef struct {
|
||||
int8_t version; // for compatibility(default 0)
|
||||
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
|
||||
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
|
||||
int8_t timezoneInt; // sma data expired if timezone changes.
|
||||
int32_t dstVgId;
|
||||
char indexName[TSDB_INDEX_NAME_LEN];
|
||||
int32_t exprLen;
|
||||
int32_t tagsFilterLen;
|
||||
int32_t numOfVgroups;
|
||||
int64_t indexUid;
|
||||
tb_uid_t tableUid; // super/child/common table uid
|
||||
int64_t interval;
|
||||
int64_t offset; // use unit by precision of DB
|
||||
int64_t sliding;
|
||||
char* expr; // sma expression
|
||||
char* tagsFilter;
|
||||
SVgEpSet* pVgEpSet;
|
||||
} STSma; // Time-range-wise SMA
|
||||
int8_t version; // for compatibility(default 0)
|
||||
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
|
||||
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
|
||||
int8_t timezoneInt; // sma data expired if timezone changes.
|
||||
int32_t dstVgId;
|
||||
char indexName[TSDB_INDEX_NAME_LEN];
|
||||
int32_t exprLen;
|
||||
int32_t tagsFilterLen;
|
||||
int32_t numOfVgroups; // for dstVgroup
|
||||
int64_t indexUid;
|
||||
tb_uid_t tableUid; // super/child/common table uid
|
||||
tb_uid_t dstTbUid; // for dstVgroup
|
||||
int64_t interval;
|
||||
int64_t offset; // use unit by precision of DB
|
||||
int64_t sliding;
|
||||
char* dstTbName; // for dstVgroup
|
||||
char* expr; // sma expression
|
||||
char* tagsFilter;
|
||||
SVgEpSet* pVgEpSet; // for dstVgroup
|
||||
SSchemaWrapper schemaRow; // for dstVgroup
|
||||
SSchemaWrapper schemaTag; // for dstVgroup
|
||||
} STSma; // Time-range-wise SMA
|
||||
|
||||
typedef STSma SVCreateTSmaReq;
|
||||
|
||||
|
@ -2493,14 +2497,14 @@ int32_t tSerializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq
|
|||
int32_t tDeserializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int8_t intervalUnit;
|
||||
int8_t slidingUnit;
|
||||
int64_t interval;
|
||||
int64_t offset;
|
||||
int64_t sliding;
|
||||
int64_t dstTbUid;
|
||||
int32_t dstVgId; // for stream
|
||||
char* expr;
|
||||
int8_t intervalUnit;
|
||||
int8_t slidingUnit;
|
||||
int64_t interval;
|
||||
int64_t offset;
|
||||
int64_t sliding;
|
||||
int64_t dstTbUid;
|
||||
int32_t dstVgId; // for stream
|
||||
char* expr;
|
||||
} STableIndexInfo;
|
||||
|
||||
typedef struct {
|
||||
|
@ -2510,7 +2514,6 @@ typedef struct {
|
|||
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
|
||||
int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp);
|
||||
|
||||
|
||||
typedef struct {
|
||||
int8_t mqMsgType;
|
||||
int32_t code;
|
||||
|
@ -2751,8 +2754,8 @@ typedef struct {
|
|||
char* msg;
|
||||
} SVDeleteReq;
|
||||
|
||||
int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq);
|
||||
int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq);
|
||||
int32_t tSerializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq);
|
||||
int32_t tDeserializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int64_t affectedRows;
|
||||
|
|
|
@ -1624,25 +1624,31 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
|||
break;
|
||||
default:
|
||||
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
|
||||
char tv[8] = {0};
|
||||
if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
|
||||
float v = 0;
|
||||
GET_TYPED_DATA(v, float, pColInfoData->info.type, var);
|
||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||
} else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
|
||||
double v = 0;
|
||||
GET_TYPED_DATA(v, double, pColInfoData->info.type, var);
|
||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||
} else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
|
||||
int64_t v = 0;
|
||||
GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var);
|
||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||
if (pCol->type == pColInfoData->info.type) {
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, var, true, offset,
|
||||
k);
|
||||
} else {
|
||||
uint64_t v = 0;
|
||||
GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var);
|
||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||
char tv[8] = {0};
|
||||
if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
|
||||
float v = 0;
|
||||
GET_TYPED_DATA(v, float, pColInfoData->info.type, var);
|
||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||
} else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
|
||||
double v = 0;
|
||||
GET_TYPED_DATA(v, double, pColInfoData->info.type, var);
|
||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||
} else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
|
||||
int64_t v = 0;
|
||||
GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var);
|
||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||
} else {
|
||||
uint64_t v = 0;
|
||||
GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var);
|
||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||
}
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, tv, true, offset,
|
||||
k);
|
||||
}
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, tv, true, offset, k);
|
||||
} else {
|
||||
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
|
||||
TASSERT(0);
|
||||
|
|
|
@ -3844,6 +3844,8 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
|
|||
if (tEncodeI32(pCoder, pSma->numOfVgroups) < 0) return -1;
|
||||
if (tEncodeI64(pCoder, pSma->indexUid) < 0) return -1;
|
||||
if (tEncodeI64(pCoder, pSma->tableUid) < 0) return -1;
|
||||
if (tEncodeI64(pCoder, pSma->dstTbUid) < 0) return -1;
|
||||
if (tEncodeCStr(pCoder, pSma->dstTbName) < 0) return -1;
|
||||
if (tEncodeI64(pCoder, pSma->interval) < 0) return -1;
|
||||
if (tEncodeI64(pCoder, pSma->offset) < 0) return -1;
|
||||
if (tEncodeI64(pCoder, pSma->sliding) < 0) return -1;
|
||||
|
@ -3853,17 +3855,24 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
|
|||
if (pSma->tagsFilterLen > 0) {
|
||||
if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1;
|
||||
}
|
||||
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
|
||||
if (tEncodeI32(pCoder, pSma->pVgEpSet[v].vgId) < 0) return -1;
|
||||
if (tEncodeI8(pCoder, pSma->pVgEpSet[v].epSet.inUse) < 0) return -1;
|
||||
int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps;
|
||||
if (tEncodeI8(pCoder, numOfEps) < 0) return -1;
|
||||
for (int32_t n = 0; n < numOfEps; ++n) {
|
||||
const SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n];
|
||||
if (tEncodeCStr(pCoder, pEp->fqdn) < 0) return -1;
|
||||
if (tEncodeU16(pCoder, pEp->port) < 0) return -1;
|
||||
|
||||
if (pSma->numOfVgroups) { // only needed in dstVgroup
|
||||
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
|
||||
if (tEncodeI32(pCoder, pSma->pVgEpSet[v].vgId) < 0) return -1;
|
||||
if (tEncodeI8(pCoder, pSma->pVgEpSet[v].epSet.inUse) < 0) return -1;
|
||||
int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps;
|
||||
if (tEncodeI8(pCoder, numOfEps) < 0) return -1;
|
||||
for (int32_t n = 0; n < numOfEps; ++n) {
|
||||
const SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n];
|
||||
if (tEncodeCStr(pCoder, pEp->fqdn) < 0) return -1;
|
||||
if (tEncodeU16(pCoder, pEp->port) < 0) return -1;
|
||||
}
|
||||
}
|
||||
|
||||
tEncodeSSchemaWrapper(pCoder, &pSma->schemaRow);
|
||||
tEncodeSSchemaWrapper(pCoder, &pSma->schemaTag);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -3871,14 +3880,16 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
|
|||
if (tDecodeI8(pCoder, &pSma->version) < 0) return -1;
|
||||
if (tDecodeI8(pCoder, &pSma->intervalUnit) < 0) return -1;
|
||||
if (tDecodeI8(pCoder, &pSma->slidingUnit) < 0) return -1;
|
||||
if (tDecodeI32(pCoder, &pSma->dstVgId) < 0) return -1;
|
||||
if (tDecodeI8(pCoder, &pSma->timezoneInt) < 0) return -1;
|
||||
if (tDecodeI32(pCoder, &pSma->dstVgId) < 0) return -1;
|
||||
if (tDecodeCStrTo(pCoder, pSma->indexName) < 0) return -1;
|
||||
if (tDecodeI32(pCoder, &pSma->exprLen) < 0) return -1;
|
||||
if (tDecodeI32(pCoder, &pSma->tagsFilterLen) < 0) return -1;
|
||||
if (tDecodeI32(pCoder, &pSma->numOfVgroups) < 0) return -1;
|
||||
if (tDecodeI64(pCoder, &pSma->indexUid) < 0) return -1;
|
||||
if (tDecodeI64(pCoder, &pSma->tableUid) < 0) return -1;
|
||||
if (tDecodeI64(pCoder, &pSma->dstTbUid) < 0) return -1;
|
||||
if (tDecodeCStr(pCoder, &pSma->dstTbName) < 0) return -1;
|
||||
if (tDecodeI64(pCoder, &pSma->interval) < 0) return -1;
|
||||
if (tDecodeI64(pCoder, &pSma->offset) < 0) return -1;
|
||||
if (tDecodeI64(pCoder, &pSma->sliding) < 0) return -1;
|
||||
|
@ -3892,7 +3903,7 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
|
|||
} else {
|
||||
pSma->tagsFilter = NULL;
|
||||
}
|
||||
if (pSma->numOfVgroups > 0) {
|
||||
if (pSma->numOfVgroups > 0) { // only needed in dstVgroup
|
||||
pSma->pVgEpSet = (SVgEpSet *)tDecoderMalloc(pCoder, pSma->numOfVgroups * sizeof(SVgEpSet));
|
||||
if (!pSma->pVgEpSet) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -3912,6 +3923,9 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
|
|||
if (tDecodeU16(pCoder, &pEp->port) < 0) return -1;
|
||||
}
|
||||
}
|
||||
|
||||
tDecodeSSchemaWrapperEx(pCoder, &pSma->schemaRow);
|
||||
tDecodeSSchemaWrapperEx(pCoder, &pSma->schemaTag);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -298,31 +298,34 @@ typedef struct {
|
|||
} SVgObj;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
char stb[TSDB_TABLE_FNAME_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int64_t createdTime;
|
||||
int64_t uid;
|
||||
int64_t stbUid;
|
||||
int64_t dbUid;
|
||||
int8_t intervalUnit;
|
||||
int8_t slidingUnit;
|
||||
int8_t timezone;
|
||||
int32_t dstVgId; // for stream
|
||||
int64_t dstTbUid;
|
||||
int64_t interval;
|
||||
int64_t offset;
|
||||
int64_t sliding;
|
||||
int32_t exprLen; // strlen + 1
|
||||
int32_t tagsFilterLen;
|
||||
int32_t sqlLen;
|
||||
int32_t astLen;
|
||||
int32_t numOfVgroups;
|
||||
char* expr;
|
||||
char* tagsFilter;
|
||||
char* sql;
|
||||
char* ast;
|
||||
SVgEpSet* pVgEpSet;
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
char stb[TSDB_TABLE_FNAME_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
char dstTbName[TSDB_TABLE_FNAME_LEN];
|
||||
int64_t createdTime;
|
||||
int64_t uid;
|
||||
int64_t stbUid;
|
||||
int64_t dbUid;
|
||||
int64_t dstTbUid;
|
||||
int8_t intervalUnit;
|
||||
int8_t slidingUnit;
|
||||
int8_t timezone;
|
||||
int32_t dstVgId; // for stream
|
||||
int64_t interval;
|
||||
int64_t offset;
|
||||
int64_t sliding;
|
||||
int32_t exprLen; // strlen + 1
|
||||
int32_t tagsFilterLen;
|
||||
int32_t sqlLen;
|
||||
int32_t astLen;
|
||||
int32_t numOfVgroups; // for dstVgroup
|
||||
char* expr;
|
||||
char* tagsFilter;
|
||||
char* sql;
|
||||
char* ast;
|
||||
SVgEpSet* pVgEpSet; // for dstVgroup
|
||||
SSchemaWrapper schemaRow; // for dstVgroup
|
||||
SSchemaWrapper schemaTag; // for dstVgroup
|
||||
} SSmaObj;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
#include "mndVgroup.h"
|
||||
#include "parser.h"
|
||||
#include "tname.h"
|
||||
|
||||
#define TSDB_SMA_VER_NUMBER 1
|
||||
|
@ -82,10 +83,12 @@ static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) {
|
|||
SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_DB_FNAME_LEN, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pSma->createdTime, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pSma->uid, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pSma->stbUid, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pSma->dbUid, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pSma->dstTbUid, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pSma->intervalUnit, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pSma->slidingUnit, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pSma->timezone, _OVER)
|
||||
|
@ -147,10 +150,12 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_DB_FNAME_LEN, _OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pSma->createdTime, _OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pSma->uid, _OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pSma->stbUid, _OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pSma->dbUid, _OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pSma->dstTbUid, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &pSma->intervalUnit, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &pSma->slidingUnit, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &pSma->timezone, _OVER)
|
||||
|
@ -260,6 +265,8 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm
|
|||
req.tagsFilterLen = pSma->tagsFilterLen;
|
||||
req.indexUid = pSma->uid;
|
||||
req.tableUid = pSma->stbUid;
|
||||
req.dstVgId = pSma->dstVgId;
|
||||
req.dstTbUid = pSma->dstTbUid;
|
||||
req.interval = pSma->interval;
|
||||
req.offset = pSma->offset;
|
||||
req.sliding = pSma->sliding;
|
||||
|
@ -267,7 +274,10 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm
|
|||
req.tagsFilter = pSma->tagsFilter;
|
||||
req.numOfVgroups = pSma->numOfVgroups;
|
||||
req.pVgEpSet = pSma->pVgEpSet;
|
||||
|
||||
req.schemaRow = pSma->schemaRow;
|
||||
req.schemaTag = pSma->schemaTag;
|
||||
req.dstTbName = pSma->dstTbName;
|
||||
|
||||
// get length
|
||||
int32_t ret = 0;
|
||||
tEncodeSize(tEncodeSVCreateTSmaReq, &req, contLen, ret);
|
||||
|
@ -425,15 +435,43 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
|
|||
mndReleaseDnode(pMnode, pDnode);
|
||||
|
||||
// todo add sma info here
|
||||
#if 1
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(pSma->ast, &pAst) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (qExtractResultSchema(pAst, &pSma->schemaRow.nCols, &pSma->schemaRow.pSchema) != 0) {
|
||||
nodesDestroyNode(pAst);
|
||||
return -1;
|
||||
}
|
||||
nodesDestroyNode(pAst);
|
||||
pSma->schemaRow.version = 1;
|
||||
|
||||
// TODO: the schemaTag generated by qExtractResultXXX later.
|
||||
pSma->schemaTag.nCols = 1;
|
||||
pSma->schemaTag.version = 1;
|
||||
pSma->schemaTag.pSchema = taosMemoryCalloc(1, sizeof(SSchema));
|
||||
if (!pSma->schemaTag.pSchema) {
|
||||
nodesDestroyNode(pAst);
|
||||
return -1;
|
||||
}
|
||||
pSma->schemaTag.pSchema[0].type = TSDB_DATA_TYPE_BIGINT;
|
||||
pSma->schemaTag.pSchema[0].bytes = TYPE_BYTES[TSDB_DATA_TYPE_BIGINT];
|
||||
pSma->schemaTag.pSchema[0].colId = pSma->schemaRow.nCols + PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
pSma->schemaTag.pSchema[0].flags = 0;
|
||||
snprintf(pSma->schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId");
|
||||
|
||||
SVgEpSet *pVgEpSet = NULL;
|
||||
int32_t numOfVgroups = 0;
|
||||
if (mndSmaGetVgEpSet(pMnode, pDb, &pVgEpSet, &numOfVgroups) != 0) {
|
||||
nodesDestroyNode(pAst);
|
||||
return -1;
|
||||
}
|
||||
nodesDestroyNode(pAst);
|
||||
|
||||
pSma->pVgEpSet = pVgEpSet;
|
||||
pSma->numOfVgroups = numOfVgroups;
|
||||
|
||||
#endif
|
||||
int32_t smaContLen = 0;
|
||||
void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
|
||||
if (pSmaReq == NULL) return -1;
|
||||
|
@ -463,13 +501,20 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
memcpy(smaObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN);
|
||||
memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
smaObj.createdTime = taosGetTimestampMs();
|
||||
#if 0
|
||||
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
#endif
|
||||
smaObj.uid = TD_DEBUG_SMA_ID;
|
||||
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
|
||||
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "td.tsma.rst.tb.%s", pCreate->name);
|
||||
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
|
||||
smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
|
||||
smaObj.stbUid = pStb->uid;
|
||||
smaObj.dbUid = pStb->dbUid;
|
||||
smaObj.intervalUnit = pCreate->intervalUnit;
|
||||
smaObj.slidingUnit = pCreate->slidingUnit;
|
||||
smaObj.timezone = pCreate->timezone;
|
||||
smaObj.dstVgId = pCreate->dstVgId;
|
||||
// smaObj.dstVgId = pCreate->dstVgId;
|
||||
smaObj.interval = pCreate->interval;
|
||||
smaObj.offset = pCreate->offset;
|
||||
smaObj.sliding = pCreate->sliding;
|
||||
|
@ -502,6 +547,42 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
memcpy(smaObj.ast, pCreate->ast, smaObj.astLen);
|
||||
}
|
||||
|
||||
#if 1 // only for debugging, not needed in common vgroups, only needed in dstVgroup.
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(smaObj.ast, &pAst) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (qExtractResultSchema(pAst, &smaObj.schemaRow.nCols, &smaObj.schemaRow.pSchema) != 0) {
|
||||
nodesDestroyNode(pAst);
|
||||
return -1;
|
||||
}
|
||||
smaObj.schemaRow.version = 1;
|
||||
|
||||
smaObj.schemaTag.nCols = 1;
|
||||
smaObj.schemaTag.version = 1;
|
||||
smaObj.schemaTag.pSchema = taosMemoryCalloc(1, sizeof(SSchema));
|
||||
if (!smaObj.schemaTag.pSchema) {
|
||||
nodesDestroyNode(pAst);
|
||||
return -1;
|
||||
}
|
||||
smaObj.schemaTag.pSchema[0].type = TSDB_DATA_TYPE_BIGINT;
|
||||
smaObj.schemaTag.pSchema[0].bytes = TYPE_BYTES[TSDB_DATA_TYPE_BIGINT];
|
||||
smaObj.schemaTag.pSchema[0].colId = smaObj.schemaRow.nCols + PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
smaObj.schemaTag.pSchema[0].flags = 0;
|
||||
snprintf(smaObj.schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId");
|
||||
|
||||
nodesDestroyNode(pAst);
|
||||
|
||||
SVgEpSet *pVgEpSet = NULL;
|
||||
int32_t numOfVgroups = 0;
|
||||
if (mndSmaGetVgEpSet(pMnode, pDb, &pVgEpSet, &numOfVgroups) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
smaObj.pVgEpSet = pVgEpSet;
|
||||
smaObj.numOfVgroups = numOfVgroups;
|
||||
#endif
|
||||
|
||||
SStreamObj streamObj = {0};
|
||||
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
|
|
|
@ -252,8 +252,12 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
|
|||
}
|
||||
|
||||
if (qExtractResultSchema(pAst, (int32_t *)&pStream->outputSchema.nCols, &pStream->outputSchema.pSchema) != 0) {
|
||||
nodesDestroyNode(pAst);
|
||||
return -1;
|
||||
}
|
||||
// free
|
||||
nodesDestroyNode(pAst);
|
||||
|
||||
|
||||
#if 0
|
||||
printf("|");
|
||||
|
|
|
@ -34,13 +34,13 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
|
|||
SMetaReader mr = {0};
|
||||
|
||||
// validate req
|
||||
// save smaIndex
|
||||
metaReaderInit(&mr, pMeta, 0);
|
||||
if (metaGetTableEntryByUid(&mr, pCfg->indexUid) == 0) {
|
||||
// TODO: just for pass case
|
||||
#if 1
|
||||
terrno = TSDB_CODE_TDB_TSMA_ALREADY_EXIST;
|
||||
metaReaderClear(&mr);
|
||||
return -1;
|
||||
return -1; // don't goto _err;
|
||||
#else
|
||||
metaReaderClear(&mr);
|
||||
return 0;
|
||||
|
|
|
@ -167,13 +167,13 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
|
|||
*/
|
||||
int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
|
||||
SSma *pSma = pVnode->pSma;
|
||||
SMeta *pMeta = pVnode->pMeta;
|
||||
SMsgCb *pMsgCb = &pVnode->msgCb;
|
||||
if (!pReq->rollup) {
|
||||
smaTrace("vgId:%d, return directly since no rollup for stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SMeta *pMeta = pVnode->pMeta;
|
||||
SMsgCb *pMsgCb = &pVnode->msgCb;
|
||||
SRSmaParam *param = &pReq->pRSmaParam;
|
||||
|
||||
if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) {
|
||||
|
|
|
@ -363,8 +363,8 @@ static int32_t tdGetSmaStorageLevel(STSmaKeepCfg *pCfg, int64_t interval) {
|
|||
*/
|
||||
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||
STsdbCfg *pCfg = SMA_TSDB_CFG(pSma);
|
||||
#if 0
|
||||
const SArray *pDataBlocks = (const SArray *)msg;
|
||||
int64_t testSkey = TSKEY_INITIAL_VAL;
|
||||
|
||||
// TODO: destroy SSDataBlocks(msg)
|
||||
|
||||
|
@ -386,6 +386,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
|||
smaWarn("vgId:%d insert tSma data failed since pDataBlocks is empty", SMA_VID(pSma));
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
#endif
|
||||
|
||||
SSmaEnv *pEnv = SMA_TSMA_ENV(pSma);
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
||||
|
@ -403,178 +404,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
STSma *pTSma = pItem->pTSma;
|
||||
STSmaWriteH tSmaH = {0};
|
||||
STSma *pTSma = pItem->pTSma;
|
||||
|
||||
if (tdInitTSmaWriteH(&tSmaH, pSma, pDataBlocks, pTSma->interval, pTSma->intervalUnit) != 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
char rPath[TSDB_FILENAME_LEN] = {0};
|
||||
char aPath[TSDB_FILENAME_LEN] = {0};
|
||||
snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid);
|
||||
tfsAbsoluteName(SMA_TFS(pSma), SMA_ENV_DID(pEnv), rPath, aPath);
|
||||
if (!taosCheckExistFile(aPath)) {
|
||||
if (tfsMkdirRecurAt(SMA_TFS(pSma), rPath, SMA_ENV_DID(pEnv)) != TSDB_CODE_SUCCESS) {
|
||||
tdUnRefSmaStat(pSma, pStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 1: Judge the storage level and days
|
||||
int32_t storageLevel = tdGetSmaStorageLevel(pCfg, tSmaH.interval);
|
||||
int32_t minutePerFile = tdGetTSmaDays(pSma, tSmaH.interval, storageLevel);
|
||||
|
||||
char smaKey[SMA_KEY_LEN] = {0}; // key: skey + groupId
|
||||
char dataBuf[512] = {0}; // val: aggr data // TODO: handle 512 buffer?
|
||||
void *pDataBuf = NULL;
|
||||
int32_t sz = taosArrayGetSize(pDataBlocks);
|
||||
for (int32_t i = 0; i < sz; ++i) {
|
||||
SSDataBlock *pDataBlock = taosArrayGet(pDataBlocks, i);
|
||||
int32_t colNum = pDataBlock->info.numOfCols;
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
int32_t rowSize = pDataBlock->info.rowSize;
|
||||
int64_t groupId = pDataBlock->info.groupId;
|
||||
for (int32_t j = 0; j < rows; ++j) {
|
||||
printf("|");
|
||||
TSKEY skey = TSKEY_INITIAL_VAL; // the start key of TS window by interval
|
||||
void *pSmaKey = &smaKey;
|
||||
bool isStartKey = false;
|
||||
|
||||
int32_t tlen = 0; // reset the len
|
||||
pDataBuf = &dataBuf; // reset the buf
|
||||
for (int32_t k = 0; k < colNum; ++k) {
|
||||
SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||
void *var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
||||
switch (pColInfoData->info.type) {
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
if (!isStartKey) {
|
||||
isStartKey = true;
|
||||
skey = *(TSKEY *)var;
|
||||
testSkey = skey;
|
||||
printf("= skey %" PRIi64 " groupId = %" PRIi64 "|", skey, groupId);
|
||||
tdEncodeTSmaKey(groupId, skey, &pSmaKey);
|
||||
} else {
|
||||
printf(" %" PRIi64 " |", *(int64_t *)var);
|
||||
tlen += taosEncodeFixedI64(&pDataBuf, *(int64_t *)var);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
printf(" %15d |", *(uint8_t *)var);
|
||||
tlen += taosEncodeFixedU8(&pDataBuf, *(uint8_t *)var);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
printf(" %15d |", *(int8_t *)var);
|
||||
tlen += taosEncodeFixedI8(&pDataBuf, *(int8_t *)var);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
printf(" %15d |", *(int16_t *)var);
|
||||
tlen += taosEncodeFixedI16(&pDataBuf, *(int16_t *)var);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
printf(" %15d |", *(uint16_t *)var);
|
||||
tlen += taosEncodeFixedU16(&pDataBuf, *(uint16_t *)var);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
printf(" %15d |", *(int32_t *)var);
|
||||
tlen += taosEncodeFixedI32(&pDataBuf, *(int32_t *)var);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
printf(" %15f |", *(float *)var);
|
||||
tlen += taosEncodeBinary(&pDataBuf, var, sizeof(float));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
printf(" %15u |", *(uint32_t *)var);
|
||||
tlen += taosEncodeFixedU32(&pDataBuf, *(uint32_t *)var);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
printf(" %15ld |", *(int64_t *)var);
|
||||
tlen += taosEncodeFixedI64(&pDataBuf, *(int64_t *)var);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
printf(" %15lf |", *(double *)var);
|
||||
tlen += taosEncodeBinary(&pDataBuf, var, sizeof(double));
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
printf(" %15lu |", *(uint64_t *)var);
|
||||
tlen += taosEncodeFixedU64(&pDataBuf, *(uint64_t *)var);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR: {
|
||||
char tmpChar[100] = {0};
|
||||
strncpy(tmpChar, varDataVal(var), varDataLen(var));
|
||||
printf(" %s |", tmpChar);
|
||||
tlen += taosEncodeBinary(&pDataBuf, varDataVal(var), varDataLen(var));
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
|
||||
char tmpChar[100] = {0};
|
||||
strncpy(tmpChar, varDataVal(var), varDataLen(var));
|
||||
printf(" %s |", tmpChar);
|
||||
tlen += taosEncodeBinary(&pDataBuf, varDataVal(var), varDataLen(var));
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
// TODO: add binary/varbinary
|
||||
TASSERT(0);
|
||||
default:
|
||||
printf("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
|
||||
TASSERT(0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
printf("\n");
|
||||
// if ((tlen > 0) && (skey != TSKEY_INITIAL_VAL)) {
|
||||
if (tlen > 0) {
|
||||
int32_t fid = (int32_t)(TSDB_KEY_FID(skey, minutePerFile, pCfg->precision));
|
||||
|
||||
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index
|
||||
// file
|
||||
// - Set and open the DFile or the B+Tree file
|
||||
// TODO: tsdbStartTSmaCommit();
|
||||
if (fid != tSmaH.dFile.fid) {
|
||||
if (tSmaH.dFile.fid != SMA_IVLD_FID) {
|
||||
tdSmaEndCommit(pEnv);
|
||||
smaCloseDBF(&tSmaH.dFile);
|
||||
}
|
||||
tdSetTSmaDataFile(&tSmaH, indexUid, fid);
|
||||
smaDebug("@@@ vgId:%d write to DBF %s, days:%d, interval:%" PRIi64 ", storageLevel:%" PRIi32
|
||||
" queryKey:%" PRIi64,
|
||||
SMA_VID(pSma), tSmaH.dFile.path, minutePerFile, tSmaH.interval, storageLevel, testSkey);
|
||||
if (smaOpenDBF(pEnv->dbEnv, &tSmaH.dFile) != 0) {
|
||||
smaWarn("vgId:%d open DB file %s failed since %s", SMA_VID(pSma),
|
||||
tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
|
||||
tdDestroyTSmaWriteH(&tSmaH);
|
||||
tdUnRefSmaStat(pSma, pStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
tdSmaBeginCommit(pEnv);
|
||||
}
|
||||
|
||||
if (tdInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen, &pEnv->txn) != 0) {
|
||||
smaWarn("vgId:%d insert tsma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64
|
||||
" since %s",
|
||||
SMA_VID(pSma), indexUid, skey, groupId, tstrerror(terrno));
|
||||
tdSmaEndCommit(pEnv);
|
||||
tdDestroyTSmaWriteH(&tSmaH);
|
||||
tdUnRefSmaStat(pSma, pStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
smaDebug("vgId:%d insert tsma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64,
|
||||
SMA_VID(pSma), indexUid, skey, groupId);
|
||||
// TODO:tsdbEndTSmaCommit();
|
||||
|
||||
// Step 3: reset the SSmaStat
|
||||
tdResetExpiredWindow(pSma, pStat, indexUid, skey);
|
||||
} else {
|
||||
smaWarn("vgId:%d invalid data skey:%" PRIi64 ", tlen %" PRIi32 " during insert tSma data for %" PRIi64,
|
||||
SMA_VID(pSma), skey, tlen, indexUid);
|
||||
}
|
||||
}
|
||||
}
|
||||
tdSmaEndCommit(pEnv); // TODO: not commit for every insert
|
||||
tdDestroyTSmaWriteH(&tSmaH);
|
||||
tdUnRefSmaStat(pSma, pStat);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -865,6 +696,19 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (TD_VID(pSma->pVnode) == pCfg->dstVgId) {
|
||||
// create stable to save tsma result in dstVgId
|
||||
SVCreateStbReq pReq = {0};
|
||||
pReq.name = pCfg->dstTbName;
|
||||
pReq.suid = pCfg->dstTbUid;
|
||||
pReq.schemaRow = pCfg->schemaRow;
|
||||
pReq.schemaTag = pCfg->schemaTag;
|
||||
|
||||
if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
tdTSmaAdd(pSma, 1);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -241,13 +241,13 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
if (tdUpdateExpireWindow(pTq->pVnode->pSma, msg, ver) != 0) {
|
||||
// TODO handle sma error
|
||||
}
|
||||
void* data = taosMemoryMalloc(msgLen);
|
||||
if (data == NULL) {
|
||||
return -1;
|
||||
}
|
||||
memcpy(data, msg, msgLen);
|
||||
// void* data = taosMemoryMalloc(msgLen);
|
||||
// if (data == NULL) {
|
||||
// return -1;
|
||||
// }
|
||||
// memcpy(data, msg, msgLen);
|
||||
|
||||
tqProcessStreamTrigger(pTq, data);
|
||||
// tqProcessStreamTrigger(pTq, data);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -174,11 +174,12 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
}
|
||||
|
||||
vTrace("vgId:%d, process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
|
||||
|
||||
#if 1
|
||||
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
||||
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
// commit if need
|
||||
if (vnodeShouldCommit(pVnode)) {
|
||||
|
@ -278,8 +279,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRp
|
|||
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||
// TODO
|
||||
|
||||
// blockDebugShowData(data);
|
||||
// blockDebugShowData(data, __func__);
|
||||
#if 0
|
||||
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
||||
#endif
|
||||
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, TD_DEBUG_SMA_ID, NULL);
|
||||
}
|
||||
|
||||
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
||||
|
|
Loading…
Reference in New Issue